I have a RXJava chain where an error can be thrown.
I would like the chain to continue and ignore the error and reach the subscribe, for only some specific errors.
For example in this code
authDataManager
.getUserAccessToken(username, password)
.subscribeOn(Schedulers.io())
.doOnNext({
authDataManager.saveUserAccessToken(it)
})
.flatMap {
appDataManager.getStations()
}
.doOnNext({
appDataManager.persistStations(it)
})
.flatMap {
appDataManager.getDriverInformation()
}
.doOnNext({
appDataManager.persistDriverInformation(it)
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
onLoginSuccess()
},
{
onLoginFailure(it)
}
)
if appDataManager.getStations()
throws an error, I still want to continue and reach the onLoginSuccess()
method.
However if getUserAccessToken(username, password)
fails, onLoginFailure
should be called.
I have tried to add onErrorResumeNext() and onExceptionResumeNext() after the flatmaps and inside them, but if I do that, the chain just exits and doesn't continue and reach the subscribe
I think you can use onErrorResumeNext
operator.
authDataManager
.getUserAccessToken(username, password)
.subscribeOn(Schedulers.io())
.doOnNext({
authDataManager.saveUserAccessToken(it)
})
.flatMap {
appDataManager.getStations()
.onErrorResumeNext(Observable.empty())
}
.doOnNext({
appDataManager.persistStations(it)
})
.flatMap {
appDataManager.getDriverInformation()
}
.doOnNext({
appDataManager.persistDriverInformation(it)
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
onLoginSuccess()
},
{
onLoginFailure(it)
}
)
I figured this out in Kotlin here (stackoverflow link)
It can be done by using a combination of mapping, filtering, optionals, and onErrorResumeNext
. I think this is about as graceful as it gets with the current version of RxJava.
authDataManager
.getUserAccessToken(username, password)
.subscribeOn(Schedulers.io())
.doOnNext({
authDataManager.saveUserAccessToken(it)
})
.flatMap {
appDataManager.getStations()
.map(stations -> Single.just(Optional.of(stations))
.onErrorResumeNext(error -> Single.just(Optional.empty()))
}
.filter(optional -> optional.isPresent())
.map(stations -> optional.get())
.doOnNext({
appDataManager.persistStations(it)
})
.flatMap {
appDataManager.getDriverInformation()
}
.doOnNext({
appDataManager.persistDriverInformation(it)
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
onLoginSuccess()
},
{
onLoginFailure(it)
}
)
Standing on the shoulders of @savepopulation's answer, and using the observation that you have three distinct observer chains:
authDataManager
.getUserAccessToken(username, password)
.subscribeOn(Schedulers.io())
.doOnNext({
authDataManager.saveUserAccessToken(it)
})
.doOnNext(
appDataManager.getStations()
.onErrorResumeNext(Observable.empty())
.subscribe( appDataManager.persistStations(it) )
)
.doOnNext( appDataManager.getDriverInformation()
.subscribe( appDataManager.persistDriverInformation(it)
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
onLoginSuccess()
},
{
onLoginFailure(it)
}
)
The main observer chain flows to completion, while the two subsidiary chains are not involved in the log at all; they are simply triggered by getting the user access token.
If you need those subsidiary branches to run asynchronously to the main chain, you will have to add subscribeOn()
/observeOn()
operators.