How to continue processing after an error happens

2019-05-18 00:33发布

问题:

I have a PublishSubject and a Subscriber which I use to process a (possibly) infinite stream of preprocessed data. The problem is that some of the elements might contain some error. I'd like to ignore them and continue processing. How can I do so? I've tried something like this:

    val subject = PublishSubject.create<String>()
    subject.retry().subscribe({
        println("next: $it")
    }, {
        println("error")
    }, {
        println("complete")
    })

    subject.onNext("foo")
    subject.onNext("bar")
    subject.onError(RuntimeException())
    subject.onNext("wom")
    subject.onComplete()

My problem is that none of the error handling methods help me out here:

onErrorResumeNext() — instructs an Observable to emit a sequence of items if it encounters an error

onErrorReturn( ) — instructs an Observable to emit a particular item when it encounters an error

onExceptionResumeNext( ) — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)

retry( ) — if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error

retryWhen( ) — if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source

I tried retry() for example but it hangs my process after the error indefinitely.

I also tried onErrorResumeNext() but it does not work as expected:

    val backupSubject = PublishSubject.create<String>()
    val subject = PublishSubject.create<String>()
    var currentSubject = subject
    subject.onErrorResumeNext(backupSubject).subscribe({
        println("next: $it")
    }, {
        println("error")
        currentSubject = backupSubject
    }, {
        println("complete")
    })

    backupSubject.subscribe({
        println("backup")
    }, {
        println("backup error")
    })

    currentSubject.onNext("foo")
    currentSubject.onNext("bar")
    currentSubject.onError(RuntimeException())
    currentSubject.onNext("wom")
    currentSubject.onComplete()

This only prints foo and bar.

回答1:

If you want to continue processing after an error, it means your error is a value just like your Strings and should go through onNext. To ensure type safety in this case, you should use some form of wrapper that can either take a regular value or an error; for example, the io.reactivex.Notification<T> is available in RxJava 2:

PublishSubject<Notification<String>> subject = PublishSubject.create();

subject.subscribe(System.out::println);

subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));