Given the following piece of code
public static void main(String[] args) {
long start = System.currentTimeMillis();
Flux.<Long>generate(s -> s.next(System.currentTimeMillis() - start))
.flatMap(DemoApp::delayedAction)
.doOnNext(l -> System.out.println(l + " -- " + (System.currentTimeMillis() - start)))
.blockLast(Duration.ofSeconds(3));
}
private static Publisher<? extends Long> delayedAction(Long l) {
return Mono.just(l).delayElement(Duration.ofSeconds(1));
}
One can see from the output that a large number of events are "processed" in delayedAction
concurrently.
In this example 256 events get generated within a few milliseconds and then wait about a second until they get emitted again.
I want to limit this number to e.g. 10, how can I do this?
The solution should be independent of what happens inside delayedAction
Background
What is really happening in delayed Action is: I do HTTP requests and starting an unlimited (or very large) amount of requests doesn't sound like a good idea.
There is already a method for this: Flux.flatMap(Function> mapper, int concurrency)
From its documentation: