I was playing with the code of this answer and it works smoothly. However, if an exception is thrown, the caller code does not catch it.
How is an Exception captured in reactor 2.0 streams? What I want to do is: if an Exception is thrown, stream processing must stop. I need to throw the Exception up in the caller thread (the one that created the steam in first place).
List<Map<String, Object>> data = readData();
Streams.from(data)
.flatMap(m -> Streams.just(m)
.dispatchOn(Environment.cachedDispatcher())
.map(ignored -> {throw new RuntimeException("kaboom!");}))
.buffer()
.consume(s -> System.out.println("s: " + s));
// the exception is not thrown and there is not opportunity to deal with it.
In Reactor you just need to wrap exceptions and return them as
Flux.error()
Then you can handle them in
onErrorXXX
methods (eg.onErrorResume
)See more:
https://projectreactor.io/docs/core/release/reference/#error.handling