I have an Observable<String>
that performs some work. After its done, it closes its connections (via .setDisposable(Disposables.fromAction(logic*);
. Trouble is, I need this close action to be performed on the same scheduler as the actual workload.
Is it possible? How?
I dont want to force the Observable to perform on a given scheduler, e.g. myObservable.subscribeOn(x).unsubscribeOn(x);
;
You need a single threaded scheduler for that to work and you can get one via Schedulers.from(Executor)
or new SingleScheduler
(warning: internal).
The problem with unsubscribeOn
is that it runs only if the downstream actually disposes/cancels the stream (unlike 1.x where this happens almost all the time).
A better way is to use using
with the aforementioned custom scheduler and manually schedule the cleanup:
Observable<T> createObservable(Scheduler scheduler) {
return Observable.create(s -> {
Resource res = ...
s.setCancellable(() ->
scheduler.scheduleDirect(() ->
res.close() // may need try-catch here
)
);
s.onNext(...);
s.onComplete();
}).subscribeOn(scheduler);
}
Scheduler scheduler = new SingleScheduler();
createObservable(scheduler)
.map(...)
.filter(...)
.subscribe(...);
But note that unless the create
logic gives up the scheduler (by terminating or going async), the cancellation logic may not ever execute due to same-pool livelock.