How to handle exceptions thrown by observer's

2019-01-17 14:44发布

问题:

Consider the following example:

Observable.range(1, 10).subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

This outputs numbers from 1 to 5 and then prints the exception.

What I want to achieve is make the observer stay subscribed and continue to run after throwing an exception, i.e. print all numbers from 1 to 10.

I have tried using retry() and other various error handling operators, but, as said in the documentation, their purpose is handling errors emitted by the observable itself.

The most straightforward solution is just to wrap the whole body of onNext into a try-catch block, but that doesn't sound like a good solution to me. In the similar Rx.NET question, the proposed solution was to make an extension method which would do the wrapping by creating a proxy observable. I tried to remake it:

Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
        origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));

proxy.subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

This does not change anything, because RxJava itself wraps the subscriber into a SafeSubscriber. Using unsafeSubscribe to get around it doesn't seem to be a good solution either.

What can I do to solve this problem?

回答1:

This is a common question that arises when learning Rx.

TL;DR

Your suggestion to put your exception handling logic in the subscriber is preferable over creating a generic observable wrapper.

Explanation

Remember, that Rx is about pushing events to subscribers.

From the observable interface, it's clear there's not really anything an observable can know about it's subscribers other than how long they took to handle an event, or the information contained in any thrown exceptions.

A generic wrapper to handle subscriber exceptions and carry on sending events to that subscriber is a bad idea.

Why? Well the observable should only really know that the subscriber is now in an unknown failure state. To carry on sending events in this situation is unwise - perhaps, for example, the subscriber is in a condition where every event from this point forward is going to throw an exception and take a while to do it.

Once a subscriber has thrown an exception, there are only two viable courses of action for the observable:

  • Re-throw the exception
  • Implement generic handling to log the failure and stop sending it events (of any kind) and clean up any resources due to that subscriber and carry on with any remaining subscriptions.

Specific handling of subscriber exceptions would be a poor design choice; it would create inappropriate behavioural coupling between subscriber and observable. So if you want to be resilient to bad subscribers the two choices above are really the sensible limit of responsibility of the observable itself.

If you want your subscriber to be resilient and carry on, then you should absolutely wrap it in exception handling logic designed to handle the specific exceptions you know how to recover from (and perhaps to handle transient exceptions, logging, retry logic, circuit breaking etc.).

Only the subscriber itself will have the context to understand whether it is fit to receive further events in the face of failure.

If your situation warrants developing reusable error handling logic, put yourself in the mindset of wrapping the observer's event handlers rather than the observable - and do take care not to blindly carry on transmitting events in the face of failure. Release It! whilst not written about Rx, is an entertaining software engineering classic has plenty to say on this last point. If you've not read it, I highly advise it.