Catch error if retryWhen:s retries runs out

2020-08-09 09:27发布

问题:

In the documentation for RetryWhen the example there goes like this:

Observable.create((Subscriber<? super String> s) -> {
  System.out.println("subscribing");
  s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
  return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
      System.out.println("delay retry by " + i + " second(s)");
      return Observable.timer(i, TimeUnit.SECONDS);
  });
}).toBlocking().forEach(System.out::println);

But how do I propagate the Error if the retries runs out?

Adding .doOnError(System.out::println) after the retryWhen clause does not catch the error. Is it even emitted?

Adding a .doOnError(System.out::println) before retryWhen displays always fails for all retries.

回答1:

The doc for retryWhen says that it passes onError notification to its subscribers and terminates. So you can do something like this:

    final int ATTEMPTS = 3;

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> attempts
            .zipWith(Observable.range(1, ATTEMPTS), (n, i) ->
                    i < ATTEMPTS ?
                            Observable.timer(i, SECONDS) :
                            Observable.error(n))
            .flatMap(x -> x))
            .toBlocking()
            .forEach(System.out::println);


回答2:

The Javadoc for retryWhen states that:

If that Observable calls onComplete or onError then retry will call onCompleted or onError on the child subscription.

Put simply, if you want to propagate the exception, you'll need to rethrow the original exception once you've had enough retrying.

An easy way is to set your Observable.range to be 1 greater than the number of times you want to retry.

Then in your zip function test the current number of retries. If it's equal to NUMBER_OF_RETRIES + 1, return Observable.error(throwable) or re-throw your exception.

EG

Observable.create((Subscriber<? super String> s) -> {
            System.out.println("subscribing");
            s.onError(new RuntimeException("always fails"));
        }).retryWhen(attempts -> {
            return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> {
                if (attempt == NUMBER_OF_RETRIES + 1) {
                    throw Throwables.propagate(throwable);
                }
                else {
                    return attempt;
                }
            }).flatMap(i -> {
                System.out.println("delaying retry by " + i + " second(s)");
                return Observable.timer(i, TimeUnit.SECONDS);
            });
        }).toBlocking().forEach(System.out::println);

As an aside doOnError does not affect the Observable in any way - it simply provides you with a hook to perform some action if an error occurs. A common example is logging.



回答3:

One option is using Observable.materialize() to convert Observable.range() items into notifications. Then once onCompleted() is issued, one can propagate error downstream (in sample below Pair is used to wrap Observable.range() notifications and exception from Observable)

   @Test
   public void retryWhen() throws Exception {

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> {
        return attempts.zipWith(Observable.range(1, 3).materialize(), Pair::new)
           .flatMap(notifAndEx -> {
            System.out.println("delay retry by " + notifAndEx + " second(s)");
            return notifAndEx.getRight().isOnCompleted()
                    ? Observable.<Integer>error(notifAndEx.getLeft())
                    : Observable.timer(notifAndEx.getRight().getValue(), TimeUnit.SECONDS);
        });
    }).toBlocking().forEach(System.out::println);
}

    private static class Pair<L,R> {
        private final L left;
        private final R right;

        public Pair(L left, R right) {
            this.left = left;
            this.right = right;
        }

        public L getLeft() {
            return left;
        }

        public R getRight() {
            return right;
        }
    }


回答4:

You can get the behaviour you want using the RetryWhen builder in rxjava-extras which is on Maven Central. Use the latest version.

Observable.create((Subscriber<? super String> s) -> {
    System.out.println("subscribing");
    s.onError(new RuntimeException("always fails"));
}) 
.retryWhen(RetryWhen
   .delays(Observable.range(1, 3)
               .map(n -> (long) n), 
            TimeUnit.SECONDS).build())
.doOnError(e -> e.printStackTrace()) 
.toBlocking().forEach(System.out::println);


回答5:

You need to use onErrorResumeNext after the retryWhen

In your example

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> {
        return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (n, i) -> {
            if (i == NUMBER_OF_RETRIES + 1) {
                throw Throwables.propagate(n);
            }
            else {
                return i;
            }
        }).flatMap(i -> {
            System.out.println("delay retry by " + i + " second(s)");
            return Observable.timer(i, TimeUnit.SECONDS);
        });
    })
    .onErrorResumeNext(t -> {System.out.println("Error after all retries:" + t.getMessage());
                                              return Observable.error(t);
                                          })
    .toBlocking().forEach(System.out::println);

At the bottom of this class you can see a practical example to understand how works. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java



回答6:

You can either use scan function, which returns a pair with accumulated index and decide whether or not to pass on the error:

.retryWhen(attempts -> 
    return .scan(Pair.create(0, null), (index, value) -> Pair.create(index.first + 1, value))
            .flatMap(pair -> {
                if(pair.first > MAX_RETRY_COUNT) {
                    throw new RuntimeException(pair.second);
                }
                return Observable.timer(pair.first, TimeUnit.SECONDS);
            });

Or you can stick with zipWith operator but increase the number in range Observable and return a pair, instead of the index alone. That way, you won't loose the information about previous throwable.

attempts
    .zipWith(Observable.range(1, MAX_RETRY_COUNT + 1), (throwable, i) -> Pair.create(i, throwable))
    .flatMap(pair -> {
        if(pair.first > MAX_RETRY_COUNT) throw new RuntimeException(pair.second);
        System.out.println("delay retry by " + pair.first + " second(s)");
        return Observable.timer(pair.first, TimeUnit.SECONDS);
    });