Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove processing/scheduling logic from StreamingDataflowWorker #31317

Merged
merged 10 commits into from
May 31, 2024

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented May 16, 2024

includes #31148

StreamingDataflowWorker will eventually just be constructing the harness runtime (different implementations for appliance, dispatched, fanout etc) and the main method.

R: @scwhittle


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @chamikaramj added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An initial pass, didn't look much at tests yet


@AutoValue
public abstract class ExecutionState {
@Internal
public abstract class ExecutionState implements AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here?
We have a bunch of context-y things for processing floating around. Would be good to note that this is per-computation and that it can be reused across bundles/work with resetting of things it contains.

Separately I wonder if we should merge this and StreamingModeExecutionStateContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be worth exploring cleaning up StreamingModeExecutionContext + ExecutionState

looks like streaming.worker.ExecutionState (there is another execution state StreamingModeExecutionState which extends DataflowExecutionState which extends core.metrics.ExecutionState which seems more for metric sampling)

There could be a lot of clean up here

Added a comment and TODO to clean up as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionState is also specific to a computation so will add comment and rename accordingly.

@m-trieu
Copy link
Contributor Author

m-trieu commented May 29, 2024

test failure is unrelated @scwhittle

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay, nothing too major left

return backlogBytes;
}

public boolean workIsFailed() {
return workIsFailed.get();
return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

work is not marked Nullable, can the ofNullable be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it ever possible that this is called when work is not set?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like work should be marked Nullable since it's not set to non-null in constructor

closeWorkExecutor();
}

public final void closeWorkExecutor() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment? when should this be called? is it possible to reuse after this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can drop this and just use ComputationWorkExexutor.close

Seems like it's just used in tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed, letting the tests just use ComputationWorkExexutor.invalidate()

public abstract String computationId();

/**
* {@link WindmillStream.GetDataStream} that connects to the backend Windmill worker handling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update comment to just say how to handle GetData requests (not necessarily a stream for appliance)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*/
public abstract Consumer<Commit> workCommitter();

public abstract Optional<WindmillStream.GetDataStream> getDataStream();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment

is this just exposed so we can do heartbeat batching since we have keyedDataFetcher otherwise? If so perhaps it would be better to change to some more restricted interface in a follow up PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea since for refreshing work we need to group all active work by getDataStream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

WorkCommitter workCommitter = mock(WorkCommitter.class);
doNothing().when(workCommitter).commit(any(Commit.class));
WindmillStream.GetDataStream getDataStream = mock(WindmillStream.GetDataStream.class);
when(getDataStream.requestKeyedData(anyString(), any()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private static final Logger LOG = LoggerFactory.getLogger(StreamingCommitFinalizer.class);
private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = Duration.ofMinutes(5L);
private final Cache<Long, Runnable> onCommitFinalizedCache;
private final BoundedQueueExecutor workExecutor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename throughout to just executor or finalizationExecutor

since we're not dealing with work here it's a little confusing to call workExecutor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

/**
* Calls callbacks for WorkItem to mark that commit has been persisted (finalized) to the backing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment could be clearer how about something like

When this method is called, the commits associated with the provided finalizeIds have been successfully persisted in the backing state store. If the commitCallback for the finalizationId is still cached it is invoked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

/**
* Stores a map of user worker generated id's and callbacks to execute once a commit has been
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about being more specific "user worker generated finalization ids"
since we have lots of ids floating around :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea done

@scwhittle
Copy link
Contributor

I think the examples failure is unrelated so don't bother investigating it.

Copy link

codecov bot commented May 30, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 68.51%. Comparing base (5c30b1d) to head (8da5f40).
Report is 42 commits behind head on master.

Current head 8da5f40 differs from pull request most recent head 570c4ef

Please upload reports for the commit 570c4ef to get more accurate results.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #31317      +/-   ##
============================================
- Coverage     68.55%   68.51%   -0.05%     
+ Complexity    14921    14823      -98     
============================================
  Files          2636     2636              
  Lines        222080   221689     -391     
  Branches      11825    11784      -41     
============================================
- Hits         152238   151881     -357     
+ Misses        63647    63629      -18     
+ Partials       6195     6179      -16     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple more comments

return backlogBytes;
}

public boolean workIsFailed() {
return workIsFailed.get();
return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like work should be marked Nullable since it's not set to non-null in constructor

* {@link DataflowExecutionContext} for use in streaming mode. Contains cached readers and Beam
* state pertaining to a processing its owning computation. Can be reused across processing
* different WorkItems for the same computation.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this can be removed to catch the missing work Nullable and perhaps other things?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compiler will complain with under initialized error will update to prevent both

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are quite a bit of nullability issues not just with work nullability

added a comment to deal with this a follow up PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if the separate state via biulder and then accessor instead of just local variables will lead to performance regression since this is on the hot path.

Since we don't actually clear the workScopedState between work items it seems that the null check will only help before start is ever called and then the additional machinery will be a no-op but more expensive.

Can you revert back to existing local variables for now? Perhaps we can improve nullability/correctness verification while merging this and the ComputationWorkExecutor and do a perf comparison. I don't want to make this big change riskier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done sgtm!

@m-trieu
Copy link
Contributor Author

m-trieu commented May 31, 2024

run java precommit

* {@link DataflowExecutionContext} for use in streaming mode. Contains cached readers and Beam
* state pertaining to a processing its owning computation. Can be reused across processing
* different WorkItems for the same computation.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if the separate state via biulder and then accessor instead of just local variables will lead to performance regression since this is on the hot path.

Since we don't actually clear the workScopedState between work items it seems that the null check will only help before start is ever called and then the additional machinery will be a no-op but more expensive.

Can you revert back to existing local variables for now? Perhaps we can improve nullability/correctness verification while merging this and the ComputationWorkExecutor and do a perf comparison. I don't want to make this big change riskier.

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM just going to run some internal tests before merging

@scwhittle
Copy link
Contributor

Run Java PreCommit

@scwhittle scwhittle merged commit f93a67a into apache:master May 31, 2024
16 of 17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants