I try to figure out Reactor Project and I'm looking for a way to cancel Subscriptions.
I know that after making Subscription of for example Flux I can get reference to Cancellation object which can be used to send onCancel Signal, but this is only after making subscription and I need to hold that reference in some kind of Collection.
Is there better way to get Cancellation object? Or just to cancel Subscriptions. Maybe some kind of place which contains reference to all active Subscriptions - yeah that will be awesome...
In Reactor, there is no sense in wanting to cancel a Subscription
before you've called subscribe()
(as it is that very method that creates the Subscription
and propagates that signal up the chain to start the emission of data).
There is no centralized place with all subscriptions, that doesn't make much sense because you'd need a way of finding the specific subscriptions you want to cancel (and keep in mind that each operator in your chain can use an intermediate Subscription as well...).
Note some operators will also cancel subscriptions on your behalf! That is the case for take(int)
for instance, which will cancel upstream once enough items have been emitted:
Flux.just(1, 2, 3, 4).log().take(2).subscribe(System.out::println);
will output:
14:17:48.729 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(1)
1
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(2)
2
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | cancel()