可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
In the documentation for RetryWhen the example there goes like this:
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);
But how do I propagate the Error if the retries runs out?
Adding .doOnError(System.out::println)
after the retryWhen
clause does not catch the error. Is it even emitted?
Adding a .doOnError(System.out::println)
before retryWhen displays always fails
for all retries.
回答1:
The doc for retryWhen
says that it passes onError
notification to its subscribers and terminates. So you can do something like this:
final int ATTEMPTS = 3;
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> attempts
.zipWith(Observable.range(1, ATTEMPTS), (n, i) ->
i < ATTEMPTS ?
Observable.timer(i, SECONDS) :
Observable.error(n))
.flatMap(x -> x))
.toBlocking()
.forEach(System.out::println);
回答2:
The Javadoc for retryWhen
states that:
If that Observable calls onComplete or onError then retry will call
onCompleted or onError on the child subscription.
Put simply, if you want to propagate the exception, you'll need to rethrow the original exception once you've had enough retrying.
An easy way is to set your Observable.range
to be 1 greater than the number of times you want to retry.
Then in your zip function test the current number of retries. If it's equal to NUMBER_OF_RETRIES + 1, return Observable.error(throwable)
or re-throw your exception.
EG
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> {
if (attempt == NUMBER_OF_RETRIES + 1) {
throw Throwables.propagate(throwable);
}
else {
return attempt;
}
}).flatMap(i -> {
System.out.println("delaying retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);
As an aside doOnError
does not affect the Observable in any way - it simply provides you with a hook to perform some action if an error occurs. A common example is logging.
回答3:
One option is using Observable.materialize()
to convert Observable.range()
items into notifications. Then once onCompleted()
is issued, one can propagate error downstream (in sample below Pair
is used to wrap Observable.range()
notifications and exception from Observable
)
@Test
public void retryWhen() throws Exception {
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3).materialize(), Pair::new)
.flatMap(notifAndEx -> {
System.out.println("delay retry by " + notifAndEx + " second(s)");
return notifAndEx.getRight().isOnCompleted()
? Observable.<Integer>error(notifAndEx.getLeft())
: Observable.timer(notifAndEx.getRight().getValue(), TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);
}
private static class Pair<L,R> {
private final L left;
private final R right;
public Pair(L left, R right) {
this.left = left;
this.right = right;
}
public L getLeft() {
return left;
}
public R getRight() {
return right;
}
}
回答4:
You can get the behaviour you want using the RetryWhen
builder in rxjava-extras which is on Maven Central. Use the latest version.
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
})
.retryWhen(RetryWhen
.delays(Observable.range(1, 3)
.map(n -> (long) n),
TimeUnit.SECONDS).build())
.doOnError(e -> e.printStackTrace())
.toBlocking().forEach(System.out::println);
回答5:
You need to use onErrorResumeNext after the retryWhen
In your example
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (n, i) -> {
if (i == NUMBER_OF_RETRIES + 1) {
throw Throwables.propagate(n);
}
else {
return i;
}
}).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
})
.onErrorResumeNext(t -> {System.out.println("Error after all retries:" + t.getMessage());
return Observable.error(t);
})
.toBlocking().forEach(System.out::println);
At the bottom of this class you can see a practical example to understand how works. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java
回答6:
You can either use scan function, which returns a pair with accumulated index and decide whether or not to pass on the error:
.retryWhen(attempts ->
return .scan(Pair.create(0, null), (index, value) -> Pair.create(index.first + 1, value))
.flatMap(pair -> {
if(pair.first > MAX_RETRY_COUNT) {
throw new RuntimeException(pair.second);
}
return Observable.timer(pair.first, TimeUnit.SECONDS);
});
Or you can stick with zipWith
operator but increase the number in range
Observable and return a pair, instead of the index alone. That way, you won't loose the information about previous throwable
.
attempts
.zipWith(Observable.range(1, MAX_RETRY_COUNT + 1), (throwable, i) -> Pair.create(i, throwable))
.flatMap(pair -> {
if(pair.first > MAX_RETRY_COUNT) throw new RuntimeException(pair.second);
System.out.println("delay retry by " + pair.first + " second(s)");
return Observable.timer(pair.first, TimeUnit.SECONDS);
});