I'm trying to make an example of a Flux
with Project Reactor that has the following characteristics:
- A single hot observable, which emits one item per second.
- Two subscribers, each of them using a separate thread of the publisher.
- A limited history when calling
replay()
, so some items will be missed in case one of the subscribers is too slow.
Then I coded this sample:
import java.time.Duration;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class FluxTest {
public static void main(String[] args) {
final ConnectableFlux<Integer> publisher = Flux.range(1, 20)
.delayElements(Duration.ofSeconds(1))
.replay(8);
publisher.publishOn(Schedulers.newSingle("fast"))
.subscribe(i -> {
System.out.println("Fast subscriber - Received " + i);
sleep(1);
});
publisher.publishOn(Schedulers.newSingle("slow"))
.subscribe(i -> {
System.out.println("Slow subscriber - Received " + i);
sleep(5);
});
publisher.connect();
}
private static void sleep(int seconds) {
try {
Thread.sleep(seconds * 1000L);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
According to the documentation of replay(int history)
method, I'd expect that, after few seconds, the second consumer (the slow one) would start losing the track, but it doesn't. See this part of the console output as an example:
...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 4
Fast subscriber - Received 16
Fast subscriber - Received 17
I expected the slow subscriber not to be able to receive 4
since that element should no longer be in history (15 - 8 = 7, that should be the last one).
Note that, if I use the method replay(8, Duration.ofSeconds(8))
then I get what I'd expect:
...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 8
Fast subscriber - Received 16
Fast subscriber - Received 17
I think I'm missing something important here, but I don't know what it is.
replay(8)
is capable of replaying 8 elements that where emitted before the subscriber subscribed. For elements that come in after, they are directly relayed to the subscriber. Here you subscribeslow
before you connect, so the size of the replay buffer doesn't really matter.Your slow subscriber sleeps on a dedicated thread, so what happens is that
publishOn
does receive all of the data, puts it in an internalQueue
and schedules itself on theslow
thread to drain that queue, in a drain loop that gets blocked for 5s on each iteration.Still, the operator has seen and is capable of processing all of the data.