Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize Stream flatMapPar #8863

Merged
merged 14 commits into from
May 22, 2024
Merged

Conversation

eyalfa
Copy link
Contributor

@eyalfa eyalfa commented May 16, 2024

I've introduced a benchmark for flatMapPar, initial results:

Benchmark                          (chunkCount)  (chunkSize)  (parChunkSize)   Mode  Cnt  Score   Error  Units
StreamParBenchmark.akkaFlatMapPar         10000         5000              50  thrpt   15  1.208 ± 0.011  ops/s
StreamParBenchmark.fs2FlatMapPar          10000         5000              50  thrpt   15  0.124 ± 0.009  ops/s
StreamParBenchmark.zioFlatMapPar          10000         5000              50  thrpt   15  0.201 ± 0.002  ops/s

This pr introduces two alternate implementations to ZChannel.mergeAllWith, one of them is roughly 50% faster than the original:

Benchmark                          (chunkCount)  (chunkSize)  (parChunkSize)   Mode  Cnt  Score   Error  Units
StreamParBenchmark.zioFlatMapPar         10000         5000              50  thrpt   15  0.303 ± 0.002  ops/s

the other is over 4 times faster:

Benchmark                                   (chunkCount)  (chunkSize)  (parChunkSize)   Mode  Cnt   Score   Error  Units
StreamParBenchmark.zioFlatMapPar                   10000         5000              50  thrpt   15   0.866 ± 0.013  ops/s
  • the slower implementation is required in order to support MergeStrategy.BufferSliding, hence it could not be discarded.

The first implementation basically reuses the same techniques used in #8819, it also uses multiple Refs to reduce contention when updating the OutDone value.

The faster implementation takes a different approach, instead of spawning a fiber per nested stream it spawns n worker fibers.
the workers compete over upstream elements using a queue and write the nested streams elements and completions to a second queue.
Upstream completion places a special message in the second queue, consisting of upstream's completion value and the number of nested streams. The channel draining the second queue keeps track of the number of completions and the special upstream completion message in order to aggregate the final OutDone value and detect streams completion.

This pr also add couple of benchmarks showing the effect of chunking on the flatMapPar operator, first one is a bit unfair as it replaces some of the stream operations with Chunk's equivalent ones, while second one is closer to flatMapPar. Both techniques can only live in user code as they may cause indefinite blocking in case of effectfull streams.

the complete benchmarks results:

Benchmark                                   (chunkCount)  (chunkSize)  (parChunkSize)   Mode  Cnt   Score   Error  Units
StreamParBenchmark.zioFlatMapPar                   10000         5000              50  thrpt   15   0.866 ± 0.013  ops/s
StreamParBenchmark.zioFlatMapParChunks             10000         5000              50  thrpt   15  29.721 ± 0.266  ops/s
StreamParBenchmark.zioFlatMapParChunksFair         10000         5000              50  thrpt   15   2.529 ± 0.033  ops/s

@eyalfa
Copy link
Contributor Author

eyalfa commented May 16, 2024

cc @jdegoes

@eyalfa
Copy link
Contributor Author

eyalfa commented May 18, 2024

still working on this, I was able to add another benchmark that (IMHO) proves that the bottleneck in this benchmark is actually forking. the slightly modified benchmark works on the chunks level instead of the element level (implementing the same logic), resulting with the same number of enqueue/dequeue operations going through the queue but a 48(!!) improvement factor over the 'naive' benchmark. I suspect this factor of 48 comes from the chunk size of 50 used by the streams benchmark and it implies that the heavy lifting here is starting and managing fibers.

benchmark code:

@Benchmark
  def zioFlatMapParChunks: Long = {
    val result = ZStream
      .fromIterable(zioChunks)
      .flatMapPar(4){ c =>
        val cc = c.flatMap(i => Chunk(i, i + 1))
        ZStream.fromChunk(cc)
      }
      .runCount

    unsafeRun(result)
  }

benchmarks results:

Benchmark                               (chunkCount)  (chunkSize)  (parChunkSize)   Mode  Cnt   Score   Error  Units
StreamParBenchmark.zioFlatMapPar               10000         5000              50  thrpt   15   0.309 ± 0.003  ops/s
StreamParBenchmark.zioFlatMapParChunks         10000         5000              50  thrpt   15  14.463 ± 0.316  ops/s

@eyalfa
Copy link
Contributor Author

eyalfa commented May 19, 2024

still working on this, I was able to add another benchmark that (IMHO) proves that the bottleneck in this benchmark is actually forking. the slightly modified benchmark works on the chunks level instead of the element level (implementing the same logic), resulting with the same number of enqueue/dequeue operations going through the queue but a 48(!!) improvement factor over the 'naive' benchmark. I suspect this factor of 48 comes from the chunk size of 50 used by the streams benchmark and it implies that the heavy lifting here is starting and managing fibers.

benchmark code:

@Benchmark
  def zioFlatMapParChunks: Long = {
    val result = ZStream
      .fromIterable(zioChunks)
      .flatMapPar(4){ c =>
        val cc = c.flatMap(i => Chunk(i, i + 1))
        ZStream.fromChunk(cc)
      }
      .runCount

    unsafeRun(result)
  }

benchmarks results:

Benchmark                               (chunkCount)  (chunkSize)  (parChunkSize)   Mode  Cnt   Score   Error  Units
StreamParBenchmark.zioFlatMapPar               10000         5000              50  thrpt   15   0.309 ± 0.003  ops/s
StreamParBenchmark.zioFlatMapParChunks         10000         5000              50  thrpt   15  14.463 ± 0.316  ops/s

rushed a bit into conclusions, the benchmark wasn't a fair comparison to faltMapPar, though it did inspire an alternative very fast implementation (see the updated PR description)

@jdegoes
Copy link
Member

jdegoes commented May 21, 2024

@eyalfa Have you thought about re-using the same n fibers, rather than continuously forking more?

@eyalfa
Copy link
Contributor Author

eyalfa commented May 22, 2024

@eyalfa Have you thought about re-using the same n fibers, rather than continuously forking more?

@jdegoes , I did 😎
see the PR's description for details including the final benchmark results. what happened is that I understood the cost of forking only after adding the 'batched' benchmarks, this initiated the idea of using a 'fibers pool' which proved to be very performant.
see my comments on the two implementations on the diff

@varshith257
Copy link

@eyalfa Can review this #8879?

I have been playing with tapsink behaviour from two days and finally produced my results there 😅

],
n: => Int,
bufferSize: => Int /* = 16*/,
mergeStrategy: MergeStrategy.BackPressure.type
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdegoes
this implementation reuses the fibers, it gradually allocates a 'fibers pool'.
I experienced with means to avoid fibers allocation, but at least the means I've attempted resulted with some performance penalty. the unbounded use-case will avoid allocating a new fiber if the queue becomes empty immediately after offer but this approach seemed to expensive for the common case (todo here, add benchmarks for the unbounded scenario)

resSch
}

mergeStrategy match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdegoes , this is where the 'fibers pool' approach is selected. notice the BufferSliding strategy effectively relies on a fiber per operation.

@@ -1848,7 +1848,7 @@ object ZChannel {
): ZChannel[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone] =
mergeAllWith(channels, Int.MaxValue)(f)

def mergeAllWith[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone](
def mergeAllWith0[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone](
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdegoes , the original implementation temporarily kept around, will be dropped before merge (if pr is to be accepted)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was it dropped? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eyalfa Was it dropped? 🤔

@eyalfa
Copy link
Contributor Author

eyalfa commented May 22, 2024

@eyalfa Can review this #8879?

I have been playing with tapsink behaviour from two days and finally produced my results there 😅

I'll have a look sure, but next time comment on the relevant issue/pr, this is a different pr, unrelated to the tapSink issue

@varshith257
Copy link

varshith257 commented May 22, 2024

I'll have a look sure, but next time comment on the relevant issue/pr, this is a different pr, unrelated to the tapSink issue

I have seen your reply from mobile and replied from it and didn't seen this PR of another :)

@jdegoes
Copy link
Member

jdegoes commented May 22, 2024

@eyalfa Strong work! 💪

@jdegoes jdegoes merged commit 3e50286 into zio:series/2.x May 22, 2024
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants