-->

how to handle RxAndroid errors in the main thread

2019-04-25 09:35发布

问题:

I'm new to rxJava/Android and surprised that my onError lambda is sometimes called on the main-thread and sometimes not, although I use .observeOn(AndroidSchedulers.mainThread())

Example 1: onError on main-thread
this works as expected: onError is called on the main-thread

Observable.error(new RuntimeException("RTE"))
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(s -> {
                Log.e(TAG, "onNext("+s+")-thread: " + Thread.currentThread().getName());
            },
            throwable -> {
                Log.e(TAG, "onError()-thread: " + Thread.currentThread().getName());
            });

log-output:

onError()-thread: main

Example 2: onError NOT on main-thread

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("one and only");
    }
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.timeout(1, TimeUnit.SECONDS)
.subscribe(s -> {
        Log.e(TAG, "onNext("+s+")-thread: " + Thread.currentThread().getName());
    },
    throwable -> {
        Log.e(TAG, "onError()-thread: " + Thread.currentThread().getName());
    });

the output is something like this:

onNext(one and only)-thread: main
onError()-thread: RxComputationScheduler-4       

I thought that after calling observeOn(AndroidSchedulers.mainThread()), ALL emissions should be done on the main-thread.

so I have these questions:

  1. I could not find any documentation that specifies under what circumstances onError is called in which thread. Anyone knows a link?
  2. I do of course want to display some error-indication in the GUI: so how can I force onError to be ALWAYS called in the main-thread?

回答1:

I just found out that I only need to change the order of calls. When I call observeOn after timeout it works as expected:

.timeout(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())

log-output

onNext(one and only)-thread: main
onError()-thread: main

The reason is, that observeOn will only affect everything below the call, and only until some other operator changes the thread again. In the example above, timeout() will change to the computation thread.

Note, that subscribeOn works differently. It does not matter where in the chain you call it.
You should only call it once (when you call it multiple times the first call wins: see "Multiple subscribeOn" in this Blog)

Here is a nice blog-post with more details: RxJava- Understanding observeOn() and subscribeOn()



回答2:

Because .timeout(1, TimeUnit.SECONDS) by default operates on Schedulers.computation().