-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
move heartbeat processor to where it is being used #31298
base: master
Are you sure you want to change the base?
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
2110050
to
d678be8
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #31298 +/- ##
=========================================
Coverage 68.55% 68.55%
Complexity 14921 14921
=========================================
Files 2636 2636
Lines 222073 222069 -4
Branches 11825 11825
=========================================
- Hits 152234 152233 -1
+ Misses 63644 63641 -3
Partials 6195 6195 ☔ View full report in Codecov by Sentry. |
c6e3846
to
c3aa698
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @johnjcasey added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
GrpcWindmillStreamFactory windmillStreamFactory = | ||
createWindmillStreamFactory(options, clientId); | ||
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder = | ||
createWindmillStreamFactoryBuilder(options, clientId); | ||
GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(createStubFactory(options)); | ||
|
||
// If ComputationConfig.Fetcher is the Streaming Appliance implementation, WindmillServerStub |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some kind of a cyclic dependency between windmillStreamFactory and computationStateCache and the creation order is different between appliance and SE, which makes the code hard to reason about. How about something like arunpandianp@d428be0? Then we don't need these comments explaining if something then it is appliance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
c3aa698
to
d5aebd7
Compare
GrpcWindmillStreamFactory windmillStreamFactory = | ||
createWindmillStreamFactory(options, clientId); | ||
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder = | ||
createWindmillStreamFactoryBuilder(options, clientId); | ||
GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(createStubFactory(options)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move construction of dispatcherClient inside createConfigFetcherComputationStateCacheAndWindmillClient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -393,7 +384,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o | |||
return new StreamingDataflowWorker( | |||
windmillServer, | |||
clientId, | |||
configFetcherAndWindmillClient.getLeft(), | |||
configFetcherComputationStateCacheAndWindmillClient.configFetcher(), | |||
computationStateCache, | |||
WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be windmillStateCache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private static ConfigFetcherComputationStateCacheAndWindmillClient create( | ||
ComputationConfig.Fetcher configFetcher, | ||
ComputationStateCache computationStateCache, | ||
Pair<WindmillServerStub, @Nullable GrpcWindmillStreamFactory> windmillClient) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
split pair into separate members?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -185,7 +186,7 @@ private StreamingDataflowWorker( | |||
StreamingCounters streamingCounters, | |||
MemoryMonitor memoryMonitor, | |||
AtomicInteger maxWorkItemCommitBytes, | |||
GrpcWindmillStreamFactory windmillStreamFactory, | |||
@Nullable GrpcWindmillStreamFactory windmillStreamFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need both windmillServer and windmillStreamFactory here?
I see windmillServer.appendSummaryHtml
in turn calls windmillStreamFactory.appendSummaryHtml
, can we remove windmillStreamFactory
from here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need both since windmillStreamFactory is source of truth (it is used in GrpcWindmillServer#appendSummaryHtml
)
In direct path mode we don't use GrpcWindmillServer/WindmillServerStub
just inject windmillStreamFactory directly so will need that to add to status pages
new WorkHeartbeatResponseProcessor(computationStateCache::get)) | ||
.build(); | ||
if (options.isEnableStreamingEngine()) { | ||
windmillStreamFactory.scheduleHealthChecks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like is missing on non test flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
d5aebd7
to
b2b8a45
Compare
b2b8a45
to
08350cf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just the one comment, rest looks good to me.
.setSendKeyedGetDataRequests( | ||
!options.isEnableStreamingEngine() | ||
|| !DataflowRunner.hasExperiment( | ||
options, "streaming_engine_send_new_heartbeat_requests")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not set the health check here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synced offline
@johnjcasey ready to merge thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to fix spotless.
Heartbeat response processor should be close to where it is being used as we don't change the heartbeat response processor at all.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.