RxJava 2: always unsubscribe on the .subscribeOn(.

2019-09-06 14:14发布

问题:

I have an Observable<String> that performs some work. After its done, it closes its connections (via .setDisposable(Disposables.fromAction(logic*);. Trouble is, I need this close action to be performed on the same scheduler as the actual workload.

Is it possible? How?

I dont want to force the Observable to perform on a given scheduler, e.g. myObservable.subscribeOn(x).unsubscribeOn(x);;

回答1:

You need a single threaded scheduler for that to work and you can get one via Schedulers.from(Executor) or new SingleScheduler (warning: internal).

The problem with unsubscribeOn is that it runs only if the downstream actually disposes/cancels the stream (unlike 1.x where this happens almost all the time).

A better way is to use using with the aforementioned custom scheduler and manually schedule the cleanup:

Observable<T> createObservable(Scheduler scheduler) {
    return Observable.create(s -> {
        Resource res = ...

        s.setCancellable(() -> 
            scheduler.scheduleDirect(() ->
                res.close() // may need try-catch here
            )
        );

        s.onNext(...);
        s.onComplete();
    }).subscribeOn(scheduler);
}

Scheduler scheduler = new SingleScheduler();

createObservable(scheduler)
.map(...)
.filter(...)
.subscribe(...);

But note that unless the create logic gives up the scheduler (by terminating or going async), the cancellation logic may not ever execute due to same-pool livelock.