Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement simple atomic stream select #585

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

dermesser
Copy link

for #577, to supersede #578.

I want to emphasize that this change is based on my likely too superficial understanding of the library. It is likely that there exists a more elegant way to implement it.

  • re: Feature request: Stream select #577 (comment) - this is currently worked around by checking every stream for present items and starting to wait while holding its mutex; if no stream has any items, all streams must have capacity (this is asserted). Maybe there is a hole in my logic, but it sounded fine in my head.
  • I had to modify the Waiters module slightly, which is what I had hoped to avoid by using multiple fibers previously. The way I modified it makes sense for this kind of application, but it could be that there is a race condition or logical trap I overlooked.
  • There are too many in-line comments explaining what the too-long function select_of_many does, but maybe it helps you while reviewing it :-)
  • Only Locking streams are handled at the moment, with an assert in place to enforce this. Mostly for the reason that I've wanted to get an initial implementation going before trying to handle everything.

Again, I'm not at all offended if a tighter implementation ends up replacing this PR, but my fingers were itching to try and implement it myself :-)

@dermesser
Copy link
Author

I'm currently also trying to understand the 0-capacity stream mechanics, and hopefully will come up with a way to (ideally) select out of a mixed set of streams.

then (
cancel_all ();
Mutex.unlock t.mutex;
enqueue (Ok (f (Queue.take t.items))))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another domain could still have set finished to true using a different stream by the time this runs.

(also, you can't use Queue.take after unlocking)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for missing this, it should have been obvious! Please take another look, it seemed not too difficult to fix.

@dermesser
Copy link
Author

I've added a small benchmark (f733045) which helped fix two bugs (the two preceding commits).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants