-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
remove processing/scheduling logic from StreamingDataflowWorker #31317
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @chamikaramj added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An initial pass, didn't look much at tests yet
...-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
Show resolved
Hide resolved
...worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java
Outdated
Show resolved
Hide resolved
...worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java
Outdated
Show resolved
Hide resolved
...a/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java
Outdated
Show resolved
Hide resolved
...a/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java
Outdated
Show resolved
Hide resolved
...java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
Outdated
Show resolved
Hide resolved
...java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
Show resolved
Hide resolved
...java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
Outdated
Show resolved
Hide resolved
...java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
Outdated
Show resolved
Hide resolved
...ker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
Outdated
Show resolved
Hide resolved
...ker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
Outdated
Show resolved
Hide resolved
...ker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
Outdated
Show resolved
Hide resolved
...ker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
Show resolved
Hide resolved
...ker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
Outdated
Show resolved
Hide resolved
|
||
@AutoValue | ||
public abstract class ExecutionState { | ||
@Internal | ||
public abstract class ExecutionState implements AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment here?
We have a bunch of context-y things for processing floating around. Would be good to note that this is per-computation and that it can be reused across bundles/work with resetting of things it contains.
Separately I wonder if we should merge this and StreamingModeExecutionStateContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be worth exploring cleaning up StreamingModeExecutionContext + ExecutionState
looks like streaming.worker.ExecutionState
(there is another execution state StreamingModeExecutionState which extends DataflowExecutionState
which extends core.metrics.ExecutionState
which seems more for metric sampling)
There could be a lot of clean up here
Added a comment and TODO to clean up as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutionState
is also specific to a computation so will add comment and rename accordingly.
...a/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java
Outdated
Show resolved
Hide resolved
...g/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
Outdated
Show resolved
Hide resolved
...org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
Outdated
Show resolved
Hide resolved
...java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ExecuteWorkResult.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
Show resolved
Hide resolved
test failure is unrelated @scwhittle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay, nothing too major left
return backlogBytes; | ||
} | ||
|
||
public boolean workIsFailed() { | ||
return workIsFailed.get(); | ||
return Optional.ofNullable(work).map(Work::isFailed).orElse(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
work is not marked Nullable, can the ofNullable be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it ever possible that this is called when work is not set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like work should be marked Nullable since it's not set to non-null in constructor
...ker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
Outdated
Show resolved
Hide resolved
closeWorkExecutor(); | ||
} | ||
|
||
public final void closeWorkExecutor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment? when should this be called? is it possible to reuse after this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can drop this and just use ComputationWorkExexutor.close
Seems like it's just used in tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed, letting the tests just use ComputationWorkExexutor.invalidate()
public abstract String computationId(); | ||
|
||
/** | ||
* {@link WindmillStream.GetDataStream} that connects to the backend Windmill worker handling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update comment to just say how to handle GetData requests (not necessarily a stream for appliance)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
*/ | ||
public abstract Consumer<Commit> workCommitter(); | ||
|
||
public abstract Optional<WindmillStream.GetDataStream> getDataStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comment
is this just exposed so we can do heartbeat batching since we have keyedDataFetcher otherwise? If so perhaps it would be better to change to some more restricted interface in a follow up PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea since for refreshing work we need to group all active work by getDataStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
WorkCommitter workCommitter = mock(WorkCommitter.class); | ||
doNothing().when(workCommitter).commit(any(Commit.class)); | ||
WindmillStream.GetDataStream getDataStream = mock(WindmillStream.GetDataStream.class); | ||
when(getDataStream.requestKeyedData(anyString(), any())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private static final Logger LOG = LoggerFactory.getLogger(StreamingCommitFinalizer.class); | ||
private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = Duration.ofMinutes(5L); | ||
private final Cache<Long, Runnable> onCommitFinalizedCache; | ||
private final BoundedQueueExecutor workExecutor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename throughout to just executor or finalizationExecutor
since we're not dealing with work here it's a little confusing to call workExecutor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...g/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
Show resolved
Hide resolved
} | ||
|
||
/** | ||
* Calls callbacks for WorkItem to mark that commit has been persisted (finalized) to the backing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment could be clearer how about something like
When this method is called, the commits associated with the provided finalizeIds have been successfully persisted in the backing state store. If the commitCallback for the finalizationId is still cached it is invoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
/** | ||
* Stores a map of user worker generated id's and callbacks to execute once a commit has been |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about being more specific "user worker generated finalization ids"
since we have lots of ids floating around :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea done
I think the examples failure is unrelated so don't bother investigating it. |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #31317 +/- ##
============================================
- Coverage 68.55% 68.51% -0.05%
+ Complexity 14921 14823 -98
============================================
Files 2636 2636
Lines 222080 221689 -391
Branches 11825 11784 -41
============================================
- Hits 152238 151881 -357
+ Misses 63647 63629 -18
+ Partials 6195 6179 -16 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple more comments
...taflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
Show resolved
Hide resolved
return backlogBytes; | ||
} | ||
|
||
public boolean workIsFailed() { | ||
return workIsFailed.get(); | ||
return Optional.ofNullable(work).map(Work::isFailed).orElse(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like work should be marked Nullable since it's not set to non-null in constructor
* {@link DataflowExecutionContext} for use in streaming mode. Contains cached readers and Beam | ||
* state pertaining to a processing its owning computation. Can be reused across processing | ||
* different WorkItems for the same computation. | ||
*/ | ||
@SuppressWarnings({ | ||
"nullness" // TODO(https://github.com/apache/beam/issues/20497) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this can be removed to catch the missing work Nullable and perhaps other things?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compiler will complain with under initialized error will update to prevent both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are quite a bit of nullability issues not just with work nullability
added a comment to deal with this a follow up PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if the separate state via biulder and then accessor instead of just local variables will lead to performance regression since this is on the hot path.
Since we don't actually clear the workScopedState between work items it seems that the null check will only help before start is ever called and then the additional machinery will be a no-op but more expensive.
Can you revert back to existing local variables for now? Perhaps we can improve nullability/correctness verification while merging this and the ComputationWorkExecutor and do a perf comparison. I don't want to make this big change riskier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done sgtm!
...ker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
Outdated
Show resolved
Hide resolved
run java precommit |
* {@link DataflowExecutionContext} for use in streaming mode. Contains cached readers and Beam | ||
* state pertaining to a processing its owning computation. Can be reused across processing | ||
* different WorkItems for the same computation. | ||
*/ | ||
@SuppressWarnings({ | ||
"nullness" // TODO(https://github.com/apache/beam/issues/20497) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if the separate state via biulder and then accessor instead of just local variables will lead to performance regression since this is on the hot path.
Since we don't actually clear the workScopedState between work items it seems that the null check will only help before start is ever called and then the additional machinery will be a no-op but more expensive.
Can you revert back to existing local variables for now? Perhaps we can improve nullability/correctness verification while merging this and the ComputationWorkExecutor and do a perf comparison. I don't want to make this big change riskier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM just going to run some internal tests before merging
Run Java PreCommit |
includes #31148
StreamingDataflowWorker will eventually just be constructing the harness runtime (different implementations for appliance, dispatched, fanout etc) and the main method.
R: @scwhittle
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.