How to ignore error and continue infinite stream?

2019-02-02 21:12发布

I would like to know how to ignore exceptions and continue infinite stream (in my case stream of locations)?

I'm fetching current user position (using Android-ReactiveLocation) and then sending them to my API (using Retrofit).

In my case, when exception occurs during network call (e.g. timeout) onError method is invoked and stream stops itself. How to avoid it?

Activity:

private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);
...
private void start() {
    mRestService = ...;
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
            .buffer(50)
            .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}

RestService:

public interface RestService {
    @POST("/.../")
    Observable<Response> postLocations(@Body List<Location> locations);
}

7条回答
手持菜刀,她持情操
2楼-- · 2019-02-02 21:54

Try calling the rest service in a Observable.defer call. That way for every call you'll get a chance to use its own 'onErrorResumeNext' and the errors won't cause your main stream to complete.

reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
  .buffer(50)
  .flatMap(locations ->
    Observable.defer(() -> mRestService.postLocations(locations))
      .onErrorResumeNext(<SOME_DEFAULT_TO_REACT_TO>)
  )
........

That solution is originally from this thread -> RxJava Observable and Subscriber for skipping exception?, but I think it will work in your case too.

查看更多
登录 后发表回答