Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make GrpcCommitWorkStream thread-safe as documented by moving batcher out of it. #31304

Merged
merged 3 commits into from
May 21, 2024

Conversation

scwhittle
Copy link
Contributor

Make the batcher Closeable which helps users remember to flush
Improve testing of multithreaded scenarios for commits, which would have failed with ConcurrentModificationException as observed in running pipeline.

Addresses #31303


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Increase the number of streams in commit cache to number of threads
@scwhittle
Copy link
Contributor Author

R: @m-trieu

@scwhittle
Copy link
Contributor Author

Sending this for pre-review, I think I have to fix a hotkeys test still. And I would like to increase the streampool size to match # of commit threads (or perhaps will do that separately).

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

Copy link
Contributor

@m-trieu m-trieu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial pass

@@ -68,21 +70,34 @@ Windmill.KeyedGetDataResponse requestKeyedData(
/** Interface for streaming CommitWorkRequests to Windmill. */
@ThreadSafe
interface CommitWorkStream extends WindmillStream {
@NotThreadSafe
interface RequestBatcher extends Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may need to be AutoCloseable for close() to be automatically called after try with resources block. Or else may need to explicitly call close().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closeable (in io package) extends AutoCloseable https://docs.oracle.com/javase/8/docs/api/java/io/Closeable.html

"Initial commit on flushed stream should always be accepted.");
}
// Batch additional commits to the stream and possibly make an un-batched commit the
// next
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit comment formatting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final AtomicLong idGenerator;
private final JobHeader jobHeader;
private final ThrottleTimer commitWorkThrottleTimer;
private final int streamingRpcBatchLimit;
private Batcher batcher = new Batcher();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, actually this was leftover from testing that my unit tests would catch the lack of thread-safety. We should be creating a new batcher for each call instead of sharing the non-thread-safe batcher (similar to existing code)

@Override
public boolean commitWorkItem(
String computation, WorkItemCommitRequest commitRequest, Consumer<CommitStatus> onDone) {
PendingRequest request = new PendingRequest(computation, commitRequest, onDone);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we can't accept the PendingRequest we just construct this object and throw it way. How about passing long request.getSerializedSize() + computation.length() which is what PendingRequest.getBytes() returns into canAccept, and only creating the PendingRequest when we call add.

and only

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -305,7 +297,7 @@ long getBytes() {
}
}

private class Batcher {
private class Batcher implements CommitWorkStream.RequestBatcher {

private final Map<Long, PendingRequest> queue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this still throw ConcurrentModificationException if queue is accessed by multiple threads? Should we make this ConcurrentHashMap vs HashMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See other comment, batcher is not thread-safe but we shouldn't have a batcher member just vend new batchers. Was leftover from testing.

speed up test to avoid flakiness
@scwhittle
Copy link
Contributor Author

The hotkeys test flakiness appeared to be due to the slow shutdown of the worker due to the committers. I changed them to interrupt without 10 second wait since the threads are going to be waiting on the queue most likely, and then improved the interrupt handling.

@scwhittle
Copy link
Contributor Author

Also changed to have the stream pool equal to the # of threads so that we have more streaming rpcs for the threads to use.

Copy link
Contributor

@m-trieu m-trieu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@scwhittle scwhittle merged commit 89795c0 into apache:master May 21, 2024
17 checks passed
@scwhittle scwhittle deleted the commit_threads branch May 21, 2024 08:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants