-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make GrpcCommitWorkStream thread-safe as documented by moving batcher out of it. #31304
Conversation
Increase the number of streams in commit cache to number of threads
R: @m-trieu |
Sending this for pre-review, I think I have to fix a hotkeys test still. And I would like to increase the streampool size to match # of commit threads (or perhaps will do that separately). |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial pass
@@ -68,21 +70,34 @@ Windmill.KeyedGetDataResponse requestKeyedData( | |||
/** Interface for streaming CommitWorkRequests to Windmill. */ | |||
@ThreadSafe | |||
interface CommitWorkStream extends WindmillStream { | |||
@NotThreadSafe | |||
interface RequestBatcher extends Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this may need to be AutoCloseable for close() to be automatically called after try with resources block. Or else may need to explicitly call close().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closeable (in io package) extends AutoCloseable https://docs.oracle.com/javase/8/docs/api/java/io/Closeable.html
"Initial commit on flushed stream should always be accepted."); | ||
} | ||
// Batch additional commits to the stream and possibly make an un-batched commit the | ||
// next |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit comment formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final AtomicLong idGenerator; | ||
private final JobHeader jobHeader; | ||
private final ThrottleTimer commitWorkThrottleTimer; | ||
private final int streamingRpcBatchLimit; | ||
private Batcher batcher = new Batcher(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, actually this was leftover from testing that my unit tests would catch the lack of thread-safety. We should be creating a new batcher for each call instead of sharing the non-thread-safe batcher (similar to existing code)
@Override | ||
public boolean commitWorkItem( | ||
String computation, WorkItemCommitRequest commitRequest, Consumer<CommitStatus> onDone) { | ||
PendingRequest request = new PendingRequest(computation, commitRequest, onDone); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we can't accept the PendingRequest
we just construct this object and throw it way. How about passing long request.getSerializedSize() + computation.length()
which is what PendingRequest.getBytes()
returns into canAccept
, and only creating the PendingRequest
when we call add.
and only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -305,7 +297,7 @@ long getBytes() { | |||
} | |||
} | |||
|
|||
private class Batcher { | |||
private class Batcher implements CommitWorkStream.RequestBatcher { | |||
|
|||
private final Map<Long, PendingRequest> queue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this still throw ConcurrentModificationException
if queue
is accessed by multiple threads? Should we make this ConcurrentHashMap
vs HashMap
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See other comment, batcher is not thread-safe but we shouldn't have a batcher member just vend new batchers. Was leftover from testing.
speed up test to avoid flakiness
The hotkeys test flakiness appeared to be due to the slow shutdown of the worker due to the committers. I changed them to interrupt without 10 second wait since the threads are going to be waiting on the queue most likely, and then improved the interrupt handling. |
Also changed to have the stream pool equal to the # of threads so that we have more streaming rpcs for the threads to use. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Make the batcher Closeable which helps users remember to flush
Improve testing of multithreaded scenarios for commits, which would have failed with ConcurrentModificationException as observed in running pipeline.
Addresses #31303
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.