I have a PublishSubject
and a Subscriber
which I use to process a (possibly) infinite stream of preprocessed data. The problem is that some of the elements might contain some error. I'd like to ignore them and continue processing. How can I do so? I've tried something like this:
val subject = PublishSubject.create<String>()
subject.retry().subscribe({
println("next: $it")
}, {
println("error")
}, {
println("complete")
})
subject.onNext("foo")
subject.onNext("bar")
subject.onError(RuntimeException())
subject.onNext("wom")
subject.onComplete()
My problem is that none of the error handling methods help me out here:
onErrorResumeNext()
— instructs an Observable to emit a sequence of items if it encounters an error
onErrorReturn( )
— instructs an Observable to emit a particular item when it encounters an error
onExceptionResumeNext( )
— instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
retry( )
— if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error
retryWhen( )
— if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source
I tried retry()
for example but it hangs my process after the error indefinitely.
I also tried onErrorResumeNext()
but it does not work as expected:
val backupSubject = PublishSubject.create<String>()
val subject = PublishSubject.create<String>()
var currentSubject = subject
subject.onErrorResumeNext(backupSubject).subscribe({
println("next: $it")
}, {
println("error")
currentSubject = backupSubject
}, {
println("complete")
})
backupSubject.subscribe({
println("backup")
}, {
println("backup error")
})
currentSubject.onNext("foo")
currentSubject.onNext("bar")
currentSubject.onError(RuntimeException())
currentSubject.onNext("wom")
currentSubject.onComplete()
This only prints foo
and bar
.