Reactor Flux replay(int history) method not workin

2019-08-02 18:18发布

I'm trying to make an example of a Flux with Project Reactor that has the following characteristics:

  • A single hot observable, which emits one item per second.
  • Two subscribers, each of them using a separate thread of the publisher.
  • A limited history when calling replay(), so some items will be missed in case one of the subscribers is too slow.

Then I coded this sample:

import java.time.Duration;

import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class FluxTest {

  public static void main(String[] args) {
    final ConnectableFlux<Integer> publisher = Flux.range(1, 20)
      .delayElements(Duration.ofSeconds(1))
      .replay(8);

    publisher.publishOn(Schedulers.newSingle("fast"))
      .subscribe(i -> {
        System.out.println("Fast subscriber - Received " + i);
        sleep(1);
      });

    publisher.publishOn(Schedulers.newSingle("slow"))
      .subscribe(i -> {
        System.out.println("Slow subscriber - Received " + i);
        sleep(5);
      });

    publisher.connect();
  }

  private static void sleep(int seconds) {
    try {
      Thread.sleep(seconds * 1000L);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

According to the documentation of replay(int history) method, I'd expect that, after few seconds, the second consumer (the slow one) would start losing the track, but it doesn't. See this part of the console output as an example:

...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 4
Fast subscriber - Received 16
Fast subscriber - Received 17

I expected the slow subscriber not to be able to receive 4 since that element should no longer be in history (15 - 8 = 7, that should be the last one).

Note that, if I use the method replay(8, Duration.ofSeconds(8)) then I get what I'd expect:

...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 8
Fast subscriber - Received 16
Fast subscriber - Received 17

I think I'm missing something important here, but I don't know what it is.

1条回答
贪生不怕死
2楼-- · 2019-08-02 18:40

replay(8) is capable of replaying 8 elements that where emitted before the subscriber subscribed. For elements that come in after, they are directly relayed to the subscriber. Here you subscribe slow before you connect, so the size of the replay buffer doesn't really matter.

Your slow subscriber sleeps on a dedicated thread, so what happens is that publishOn does receive all of the data, puts it in an internal Queue and schedules itself on the slow thread to drain that queue, in a drain loop that gets blocked for 5s on each iteration.

Still, the operator has seen and is capable of processing all of the data.

查看更多
登录 后发表回答