When FlatMap will listen to multiple sources concu

2019-08-25 08:54发布

问题:

What are the situations which cause Flux::flatMap to listen to multiple sources (0...infinity) concurrently?


I found out, while experimenting, that when the upstream send signals to flatMap in thread thread-upstream-1 and there are N inner streams which flatMap will listen to and each of them send signals in different thread: thread-inner-stream-i for 1<=i<=N, than for every 1<=i<=N if thread-upstream-1 != thread-inner-stream-i, flatMap will listen concurrently to all the inner streams.

I think that it's not exactly true and I missed some other scenarios.

回答1:

flatMap doesn't do any parallel work, as in: it doesn't change threads. The simplest example is

Flux.range(1, 5).hide()
    .flatMap(v -> Flux.range(10 * v, 2))
    .log()
    .blockLast(); //for test purpose

This prints:

[main] INFO  reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO  reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(10)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(11)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(20)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(21)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(30)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(31)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(40)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(41)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(50)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(51)
[main] INFO  reactor.Flux.FlatMap.1 - onComplete()

As you can see, only produces in main. If you add a publishOn after the initial range, flatMap produces everything in the same single thread publishOn will switch to.

What flatMap does however is subscribe to multiple inner Publisher, up to the concurrency parameter with a default of Queues.SMALL_BUFFER_SIZE (256).

That means that if you set it to 3, flatMap will map 3 source elements to their inner Publisher and subscribe to these publishers, but will wait for at least one to complete before it starts mapping more source elements.

If the inner Publisher use publishOn or subscribeOn, then flatMap will naturally let their events occur in the then-defined threads:

Flux.range(1, 5).hide()
    .flatMap(v -> Flux.range(v * 10, 2)
                      .publishOn(Schedulers.newParallel("foo", 3)))
    .flatMap(v -> Flux.range(10 * v, 2))
    .log()
    .blockLast(); //for test purpose

Which prints:

[main] INFO  reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO  reactor.Flux.FlatMap.1 - request(unbounded)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(10)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(11)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(20)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(21)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(30)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(31)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(50)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(51)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(40)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(41)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onComplete()