I'm trying to make an example of a Flux
with Project Reactor that has the following characteristics:
- A single hot observable, which emits one item per second.
- Two subscribers, each of them using a separate thread of the publisher.
- A limited history when calling
replay()
, so some items will be missed in case one of the subscribers is too slow.
Then I coded this sample:
import java.time.Duration;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class FluxTest {
public static void main(String[] args) {
final ConnectableFlux<Integer> publisher = Flux.range(1, 20)
.delayElements(Duration.ofSeconds(1))
.replay(8);
publisher.publishOn(Schedulers.newSingle("fast"))
.subscribe(i -> {
System.out.println("Fast subscriber - Received " + i);
sleep(1);
});
publisher.publishOn(Schedulers.newSingle("slow"))
.subscribe(i -> {
System.out.println("Slow subscriber - Received " + i);
sleep(5);
});
publisher.connect();
}
private static void sleep(int seconds) {
try {
Thread.sleep(seconds * 1000L);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
According to the documentation of replay(int history)
method, I'd expect that, after few seconds, the second consumer (the slow one) would start losing the track, but it doesn't. See this part of the console output as an example:
...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 4
Fast subscriber - Received 16
Fast subscriber - Received 17
I expected the slow subscriber not to be able to receive 4
since that element should no longer be in history (15 - 8 = 7, that should be the last one).
Note that, if I use the method replay(8, Duration.ofSeconds(8))
then I get what I'd expect:
...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 8
Fast subscriber - Received 16
Fast subscriber - Received 17
I think I'm missing something important here, but I don't know what it is.