-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
optimize Stream flatMapPar #8863
Conversation
cc @jdegoes |
still working on this, I was able to add another benchmark that (IMHO) proves that the bottleneck in this benchmark is actually forking. the slightly modified benchmark works on the chunks level instead of the element level (implementing the same logic), resulting with the same number of enqueue/dequeue operations going through the queue but a 48(!!) improvement factor over the 'naive' benchmark. I suspect this factor of 48 comes from the chunk size of 50 used by the streams benchmark and it implies that the heavy lifting here is starting and managing fibers. benchmark code:
benchmarks results:
|
rushed a bit into conclusions, the benchmark wasn't a fair comparison to |
@eyalfa Have you thought about re-using the same |
@jdegoes , I did 😎 |
], | ||
n: => Int, | ||
bufferSize: => Int /* = 16*/, | ||
mergeStrategy: MergeStrategy.BackPressure.type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jdegoes
this implementation reuses the fibers, it gradually allocates a 'fibers pool'.
I experienced with means to avoid fibers allocation, but at least the means I've attempted resulted with some performance penalty. the unbounded use-case will avoid allocating a new fiber if the queue becomes empty immediately after offer but this approach seemed to expensive for the common case (todo here, add benchmarks for the unbounded scenario)
resSch | ||
} | ||
|
||
mergeStrategy match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jdegoes , this is where the 'fibers pool' approach is selected. notice the BufferSliding
strategy effectively relies on a fiber per operation.
@@ -1848,7 +1848,7 @@ object ZChannel { | |||
): ZChannel[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone] = | |||
mergeAllWith(channels, Int.MaxValue)(f) | |||
|
|||
def mergeAllWith[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone]( | |||
def mergeAllWith0[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jdegoes , the original implementation temporarily kept around, will be dropped before merge (if pr is to be accepted)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was it dropped? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eyalfa Was it dropped? 🤔
I have seen your reply from mobile and replied from it and didn't seen this PR of another :) |
@eyalfa Strong work! 💪 |
I've introduced a benchmark for
flatMapPar
, initial results:This pr introduces two alternate implementations to
ZChannel.mergeAllWith
, one of them is roughly 50% faster than the original:the other is over 4 times faster:
MergeStrategy.BufferSliding
, hence it could not be discarded.The first implementation basically reuses the same techniques used in #8819, it also uses multiple
Ref
s to reduce contention when updating theOutDone
value.The faster implementation takes a different approach, instead of spawning a fiber per nested stream it spawns
n
worker fibers.the workers compete over upstream elements using a queue and write the nested streams elements and completions to a second queue.
Upstream completion places a special message in the second queue, consisting of upstream's completion value and the number of nested streams. The channel draining the second queue keeps track of the number of completions and the special upstream completion message in order to aggregate the final
OutDone
value and detect streams completion.This pr also add couple of benchmarks showing the effect of chunking on the
flatMapPar
operator, first one is a bit unfair as it replaces some of the stream operations withChunk
's equivalent ones, while second one is closer toflatMapPar
. Both techniques can only live in user code as they may cause indefinite blocking in case of effectfull streams.the complete benchmarks results: