I'm using RxJava in and Android application with RxAndroid. I'm using mergeDelayError to combine two retro fit network calls into one observable which will process emitted items if either emits one and the error if either has one. This is not working and it is only firing off the onError action when either encounters an error. Now to test this I shifted to a very simple example and still the successAction is never called when I have an onError call. See example below.
Observable.mergeDelayError(
Observable.error(new RuntimeException()),
Observable.just("Hello")
)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.finallyDo(completeAction)
.subscribe(successAction, errorAction);
The success action will only be called if I use two success observables. Am I missing something with how mergeDelayError is supposed to work?
EDIT:
I've found that if I remove the observeOn
and subscribeOn
everything works as expected. I need to specify threads and thought that was the whole point of using Rx. Any idea why specifying those Schedulers
would break the behavior?
Use .observeOn(AndroidSchedulers.mainThread(), true) instead of .observeOn(AndroidSchedulers.mainThread()
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
return observeOn(scheduler, delayError, RxRingBuffer.SIZE);
}
Above is the signature of observeOn function. Following code works.
Observable.mergeDelayError(
Observable.error(new RuntimeException()),
Observable.just("Hello")
)
.observeOn(AndroidSchedulers.mainThread(), true)
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
});
Got this trick from ConcatDelayError thread: https://github.com/ReactiveX/RxJava/issues/3908#issuecomment-217999009
This still seems like a bug in the mergeDelayError operator but I was able to get it working by duplicating the observerOn and Subscribe on for each observable.
Observable.mergeDelayError(
Observable.error(new RuntimeException())
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io()),
Observable.just("Hello")
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
)
.finallyDo(completeAction)
.subscribe(successAction, errorAction);
I think you don't wait for the terminal event and the main thread quits before the events are delivered to your observer. The following test passes for me with RxJava 1.0.14:
@Test
public void errorDelayed() {
TestSubscriber<Object> ts = TestSubscriber.create();
Observable.mergeDelayError(
Observable.error(new RuntimeException()),
Observable.just("Hello")
)
.subscribeOn(Schedulers.io()).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertError(RuntimeException.class);
ts.assertValue("Hello");
}