I'm new to rxJava/Android and surprised that my onError
lambda is sometimes called on the main-thread and sometimes not, although I use .observeOn(AndroidSchedulers.mainThread())
Example 1: onError
on main-thread
this works as expected: onError
is called on the main-thread
Observable.error(new RuntimeException("RTE"))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
Log.e(TAG, "onNext("+s+")-thread: " + Thread.currentThread().getName());
},
throwable -> {
Log.e(TAG, "onError()-thread: " + Thread.currentThread().getName());
});
log-output:
onError()-thread: main
Example 2: onError
NOT on main-thread
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("one and only");
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.timeout(1, TimeUnit.SECONDS)
.subscribe(s -> {
Log.e(TAG, "onNext("+s+")-thread: " + Thread.currentThread().getName());
},
throwable -> {
Log.e(TAG, "onError()-thread: " + Thread.currentThread().getName());
});
the output is something like this:
onNext(one and only)-thread: main
onError()-thread: RxComputationScheduler-4
I thought that after calling observeOn(AndroidSchedulers.mainThread()), ALL emissions should be done on the main-thread.
so I have these questions:
- I could not find any documentation that specifies under what circumstances
onError
is called in which thread. Anyone knows a link? - I do of course want to display some error-indication in the GUI: so how can I force
onError
to be ALWAYS called in the main-thread?
I just found out that I only need to change the order of calls. When I call
observeOn
aftertimeout
it works as expected:log-output
The reason is, that
observeOn
will only affect everything below the call, and only until some other operator changes the thread again. In the example above,timeout()
will change to the computation thread.Note, that
subscribeOn
works differently. It does not matter where in the chain you call it.You should only call it once (when you call it multiple times the first call wins: see "Multiple subscribeOn" in this Blog)
Here is a nice blog-post with more details: RxJava- Understanding observeOn() and subscribeOn()
Because
.timeout(1, TimeUnit.SECONDS)
by default operates onSchedulers.computation()
.