Rx Java mergeDelayError not working as expected

2019-01-25 05:07发布

I'm using RxJava in and Android application with RxAndroid. I'm using mergeDelayError to combine two retro fit network calls into one observable which will process emitted items if either emits one and the error if either has one. This is not working and it is only firing off the onError action when either encounters an error. Now to test this I shifted to a very simple example and still the successAction is never called when I have an onError call. See example below.

Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
            )
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .finallyDo(completeAction)
            .subscribe(successAction, errorAction);

The success action will only be called if I use two success observables. Am I missing something with how mergeDelayError is supposed to work?

EDIT:

I've found that if I remove the observeOn and subscribeOn everything works as expected. I need to specify threads and thought that was the whole point of using Rx. Any idea why specifying those Schedulers would break the behavior?

3条回答
【Aperson】
2楼-- · 2019-01-25 05:34

I think you don't wait for the terminal event and the main thread quits before the events are delivered to your observer. The following test passes for me with RxJava 1.0.14:

@Test
public void errorDelayed() {
    TestSubscriber<Object> ts = TestSubscriber.create();
    Observable.mergeDelayError(
            Observable.error(new RuntimeException()),
            Observable.just("Hello")
        )
        .subscribeOn(Schedulers.io()).subscribe(ts);

    ts.awaitTerminalEvent();

    ts.assertError(RuntimeException.class);
    ts.assertValue("Hello");
}
查看更多
男人必须洒脱
3楼-- · 2019-01-25 05:50

This still seems like a bug in the mergeDelayError operator but I was able to get it working by duplicating the observerOn and Subscribe on for each observable.

Observable.mergeDelayError(
            Observable.error(new RuntimeException())
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribeOn(Schedulers.io()),
            Observable.just("Hello")
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribeOn(Schedulers.io())
        )
        .finallyDo(completeAction)
        .subscribe(successAction, errorAction);
查看更多
干净又极端
4楼-- · 2019-01-25 05:55

Use .observeOn(AndroidSchedulers.mainThread(), true) instead of .observeOn(AndroidSchedulers.mainThread()

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
        return observeOn(scheduler, delayError, RxRingBuffer.SIZE);
    }

Above is the signature of observeOn function. Following code works.

  Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
        )
                .observeOn(AndroidSchedulers.mainThread(), true)
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {

                    }
                });

Got this trick from ConcatDelayError thread: https://github.com/ReactiveX/RxJava/issues/3908#issuecomment-217999009

查看更多
登录 后发表回答