RXJava Ignore Error and continue in chain

2019-08-20 03:38发布

问题:

I have a RXJava chain where an error can be thrown. I would like the chain to continue and ignore the error and reach the subscribe, for only some specific errors.

For example in this code

authDataManager
                    .getUserAccessToken(username, password)
                    .subscribeOn(Schedulers.io())
                    .doOnNext({
                        authDataManager.saveUserAccessToken(it)
                    })
                    .flatMap {
                        appDataManager.getStations()
                    }
                    .doOnNext({
                        appDataManager.persistStations(it)
                    })
                    .flatMap {
                        appDataManager.getDriverInformation()
                    }
                    .doOnNext({
                        appDataManager.persistDriverInformation(it)
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(
                            {
                                onLoginSuccess()
                            },
                            {
                                onLoginFailure(it)
                            }
                    )

if appDataManager.getStations() throws an error, I still want to continue and reach the onLoginSuccess() method.

However if getUserAccessToken(username, password) fails, onLoginFailure should be called.

I have tried to add onErrorResumeNext() and onExceptionResumeNext() after the flatmaps and inside them, but if I do that, the chain just exits and doesn't continue and reach the subscribe

回答1:

I think you can use onErrorResumeNext operator.

              authDataManager
                    .getUserAccessToken(username, password)
                    .subscribeOn(Schedulers.io())
                    .doOnNext({
                        authDataManager.saveUserAccessToken(it)
                    })
                    .flatMap {
                        appDataManager.getStations()
                             .onErrorResumeNext(Observable.empty())
                    }
                    .doOnNext({
                        appDataManager.persistStations(it)
                    })
                    .flatMap {
                        appDataManager.getDriverInformation()
                    }
                    .doOnNext({
                        appDataManager.persistDriverInformation(it)
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(
                            {
                                onLoginSuccess()
                            },
                            {
                                onLoginFailure(it)
                            }
                    )


回答2:

I figured this out in Kotlin here (stackoverflow link)

It can be done by using a combination of mapping, filtering, optionals, and onErrorResumeNext. I think this is about as graceful as it gets with the current version of RxJava.

authDataManager
    .getUserAccessToken(username, password)
    .subscribeOn(Schedulers.io())
    .doOnNext({
        authDataManager.saveUserAccessToken(it)
    })
    .flatMap {
        appDataManager.getStations()
            .map(stations -> Single.just(Optional.of(stations))
            .onErrorResumeNext(error -> Single.just(Optional.empty()))                    
     }
     .filter(optional -> optional.isPresent())
     .map(stations -> optional.get())
     .doOnNext({
         appDataManager.persistStations(it)
     })
     .flatMap {
         appDataManager.getDriverInformation()
     }
     .doOnNext({
         appDataManager.persistDriverInformation(it)
     })
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(
         {
             onLoginSuccess()
         },
         {
             onLoginFailure(it)
         }
     )


回答3:

Standing on the shoulders of @savepopulation's answer, and using the observation that you have three distinct observer chains:

         authDataManager
                .getUserAccessToken(username, password)
                .subscribeOn(Schedulers.io())
                .doOnNext({
                    authDataManager.saveUserAccessToken(it)
                })
                .doOnNext(
                    appDataManager.getStations()
                         .onErrorResumeNext(Observable.empty())
                         .subscribe( appDataManager.persistStations(it) )
                )
                .doOnNext( appDataManager.getDriverInformation()
                             .subscribe( appDataManager.persistDriverInformation(it)
                )
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                        {
                            onLoginSuccess()
                        },
                        {
                            onLoginFailure(it)
                        }
                )

The main observer chain flows to completion, while the two subsidiary chains are not involved in the log at all; they are simply triggered by getting the user access token.

If you need those subsidiary branches to run asynchronously to the main chain, you will have to add subscribeOn()/observeOn() operators.