What are the situations which cause Flux::flatMap
to listen to multiple sources (0...infinity) concurrently?
I found out, while experimenting, that when the upstream send signals to flatMap
in thread thread-upstream-1
and there are N
inner streams which flatMap will listen to and each of them send signals in different thread: thread-inner-stream-i
for 1<=i<=N
, than for every 1<=i<=N
if thread-upstream-1 != thread-inner-stream-i
, flatMap
will listen concurrently to all the inner streams.
I think that it's not exactly true and I missed some other scenarios.
flatMap
doesn't do any parallel work, as in: it doesn't change threads. The simplest example is
Flux.range(1, 5).hide()
.flatMap(v -> Flux.range(10 * v, 2))
.log()
.blockLast(); //for test purpose
This prints:
[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO reactor.Flux.FlatMap.1 - onNext(10)
[main] INFO reactor.Flux.FlatMap.1 - onNext(11)
[main] INFO reactor.Flux.FlatMap.1 - onNext(20)
[main] INFO reactor.Flux.FlatMap.1 - onNext(21)
[main] INFO reactor.Flux.FlatMap.1 - onNext(30)
[main] INFO reactor.Flux.FlatMap.1 - onNext(31)
[main] INFO reactor.Flux.FlatMap.1 - onNext(40)
[main] INFO reactor.Flux.FlatMap.1 - onNext(41)
[main] INFO reactor.Flux.FlatMap.1 - onNext(50)
[main] INFO reactor.Flux.FlatMap.1 - onNext(51)
[main] INFO reactor.Flux.FlatMap.1 - onComplete()
As you can see, only produces in main
. If you add a publishOn
after the initial range, flatMap
produces everything in the same single thread publishOn will switch to.
What flatMap
does however is subscribe to multiple inner Publisher
, up to the concurrency
parameter with a default of Queues.SMALL_BUFFER_SIZE
(256).
That means that if you set it to 3
, flatMap
will map 3 source elements to their inner Publisher
and subscribe to these publishers, but will wait for at least one to complete before it starts mapping more source elements.
If the inner Publisher
use publishOn
or subscribeOn
, then flatMap
will naturally let their events occur in the then-defined threads:
Flux.range(1, 5).hide()
.flatMap(v -> Flux.range(v * 10, 2)
.publishOn(Schedulers.newParallel("foo", 3)))
.flatMap(v -> Flux.range(10 * v, 2))
.log()
.blockLast(); //for test purpose
Which prints:
[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(10)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(11)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(20)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(21)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(30)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(31)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(50)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(51)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(40)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(41)
[foo-4] INFO reactor.Flux.FlatMap.1 - onComplete()