Handling exceptions inside Observable.fromCallable

2020-05-02 13:32发布

问题:

I have a situation where a long running process is wrapped in an Observable.fromCallable(). This process is an OkHttp call and, if terminated, will throw an IOException. If the observable is subscribed to, then the disposable is stored in a CompositeDisposable and the exception is handled as expected. However, my code will clear the CompositeDisposable in some cases, triggering the OkHttp thread termination with no error handling, causing the app to crash with an unhandled exception. Here's a simple unit test example of this problem:

@Test
public void test(){
    CompositeDisposable compositeDisposable = new CompositeDisposable();
    Observable<Object> o = Observable.fromCallable(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            System.out.println("sleeping - this sleep will be interrupted when compositeDisposable gets cleared");
            Thread.sleep(3000);
            return null;
        }
    });
    compositeDisposable.add(o.subscribeOn(new IoScheduler()).subscribe());
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    compositeDisposable.clear();
}

Is there any way to work around this problem?

回答1:

Unlike RxJava1, RxJava2 will not deliver this Exception to the Subscriber onError(), as you called cancel() to unsubscribe and don't wan't to get notifications anymore, so this kind of Exceptions which happens with the unsubscription code go by default now to Thread.currentThread().getUncaughtExceptionHandler().uncaughtException().

You can either wrap with try catch this kind of exceptions that may happens with cancel, or override the default behavior with:

RxJavaPlugins.setErrorHandler(Functions.<Throwable>emptyConsumer()); 

or any other handling you would like.

You should also read the full explanation by akarnokd at RxJava github.
Also refer to this discussion for the above mentioned solutions.