Retry logic with CompletableFuture

2020-05-19 05:48发布

I need to submit a task in an async framework I'm working on, but I need to catch for exceptions, and retry the same task multiple times before "aborting".

The code I'm working with is:

int retries = 0;
public CompletableFuture<Result> executeActionAsync() {

    // Execute the action async and get the future
    CompletableFuture<Result> f = executeMycustomActionHere();

    // If the future completes with exception:
    f.exceptionally(ex -> {
        retries++; // Increment the retry count
        if (retries < MAX_RETRIES)
            return executeActionAsync();  // <--- Submit one more time

        // Abort with a null value
        return null;
    });

    // Return the future    
    return f;
}

This currently doesn't compile because the return type of the lambda is wrong: it expects a Result, but the executeActionAsync returns a CompletableFuture<Result>.

How can I implement this fully async retry logic?

6条回答
迷人小祖宗
2楼-- · 2020-05-19 06:07

Instead of implementing your own retry logic, I recommend using a proven library like failsafe, which has built-in support for futures (and seems more popular than guava-retrying). For your example, it would look something like:

private static RetryPolicy retryPolicy = new RetryPolicy()
    .withMaxRetries(MAX_RETRIES);

public CompletableFuture<Result> executeActionAsync() {
    return Failsafe.with(retryPolicy)
        .with(executor)
        .withFallback(null)
        .future(this::executeMycustomActionHere);
}

Probably you should avoid .withFallback(null) and just have let the returned future's .get() method throw the resulting exception so the caller of your method can handle it specifically, but that's a design decision you'll have to make.

Other things to think about include whether you should retry immediately or wait some period of time between attempts, any sort of recursive backoff (useful when you're calling a web service that might be down), and whether there are specific exceptions that aren't worth retrying (e.g. if the parameters to the method are invalid).

查看更多
Anthone
3楼-- · 2020-05-19 06:14

util class:

public class RetryUtil {

    public static <R> CompletableFuture<R> retry(Supplier<CompletableFuture<R>> supplier, int maxRetries) {
        CompletableFuture<R> f = supplier.get();
        for(int i=0; i<maxRetries; i++) {
            f=f.thenApply(CompletableFuture::completedFuture)
                .exceptionally(t -> {
                    System.out.println("retry for: "+t.getMessage());
                    return supplier.get();
                })
                .thenCompose(Function.identity());
        }
        return f;
    }
}

usage:

public CompletableFuture<String> lucky(){
    return CompletableFuture.supplyAsync(()->{
        double luckNum = Math.random();
        double luckEnough = 0.6;
        if(luckNum < luckEnough){
            throw new RuntimeException("not luck enough: " + luckNum);
        }
        return "I'm lucky: "+luckNum;
    });
}
@Test
public void testRetry(){
    CompletableFuture<String> retry = RetryUtil.retry(this::lucky, 10);
    System.out.println("async check");
    String join = retry.join();
    System.out.println("lucky? "+join);
}

output

async check
retry for: java.lang.RuntimeException: not luck enough: 0.412296354211683
retry for: java.lang.RuntimeException: not luck enough: 0.4099777199676573
lucky? I'm lucky: 0.8059089479049389
查看更多
Emotional °昔
4楼-- · 2020-05-19 06:15

I recently solved a similar problem using the guava-retrying library.

Callable<Result> callable = new Callable<Result>() {
    public Result call() throws Exception {
        return executeMycustomActionHere();
    }
};

Retryer<Boolean> retryer = RetryerBuilder.<Result>newBuilder()
        .retryIfResult(Predicates.<Result>isNull())
        .retryIfExceptionOfType(IOException.class)
        .retryIfRuntimeException()
        .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRIES))
        .build();

CompletableFuture.supplyAsync( () -> {
    try {
        retryer.call(callable);
    } catch (RetryException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
       e.printStackTrace();
    }
});
查看更多
爷的心禁止访问
5楼-- · 2020-05-19 06:18

I think I was successfully. Here's an example class I created and the test code:


RetriableTask.java

public class RetriableTask
{
    protected static final int MAX_RETRIES = 10;
    protected int retries = 0;
    protected int n = 0;
    protected CompletableFuture<Integer> future = new CompletableFuture<Integer>();

    public RetriableTask(int number) {
        n = number;
    }

    public CompletableFuture<Integer> executeAsync() {
        // Create a failure within variable timeout
        Duration timeoutInMilliseconds = Duration.ofMillis(1*(int)Math.pow(2, retries));
        CompletableFuture<Integer> timeoutFuture = Utils.failAfter(timeoutInMilliseconds);

        // Create a dummy future and complete only if (n > 5 && retries > 5) so we can test for both completion and timeouts. 
        // In real application this should be a real future
        final CompletableFuture<Integer> taskFuture = new CompletableFuture<>();
        if (n > 5 && retries > 5)
            taskFuture.complete(retries * n);

        // Attach the failure future to the task future, and perform a check on completion
        taskFuture.applyToEither(timeoutFuture, Function.identity())
            .whenCompleteAsync((result, exception) -> {
                if (exception == null) {
                    future.complete(result);
                } else {
                    retries++;
                    if (retries >= MAX_RETRIES) {
                        future.completeExceptionally(exception);
                    } else {
                        executeAsync();
                    }
                }
            });

        // Return the future    
        return future;
    }
}

Usage

int size = 10;
System.out.println("generating...");
List<RetriableTask> tasks = new ArrayList<>();
for (int i = 0; i < size; i++) {
    tasks.add(new RetriableTask(i));
}

System.out.println("issuing...");
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < size; i++) {
    futures.add(tasks.get(i).executeAsync());
}

System.out.println("Waiting...");
for (int i = 0; i < size; i++) {
    try {
        CompletableFuture<Integer> future = futures.get(i);
        int result = future.get();
        System.out.println(i + " result is " + result);
    } catch (Exception ex) {
        System.out.println(i + " I got exception!");
    }
}
System.out.println("Done waiting...");

Output

generating...
issuing...
Waiting...
0 I got exception!
1 I got exception!
2 I got exception!
3 I got exception!
4 I got exception!
5 I got exception!
6 result is 36
7 result is 42
8 result is 48
9 result is 54
Done waiting...

Main idea and some glue code (failAfter function) come from here.

Any other suggestions or improvement are welcome.

查看更多
The star\"
6楼-- · 2020-05-19 06:23

Chaining subsequent retries can be straight-forward:

public CompletableFuture<Result> executeActionAsync() {
    CompletableFuture<Result> f=executeMycustomActionHere();
    for(int i=0; i<MAX_RETRIES; i++) {
        f=f.exceptionally(t -> executeMycustomActionHere().join());
    }
    return f;
}

Read about the drawbacks below
This simply chains as many retries as intended, as these subsequent stages won’t do anything in the non-exceptional case.

One drawback is that if the first attempt fails immediately, so that f is already completed exceptionally when the first exceptionally handler is chained, the action will be invoked by the calling thread, removing the asynchronous nature of the request entirely. And generally, join() may block a thread (the default executor will start a new compensation thread then, but still, it’s discouraged). Unfortunately, there is neither, an exceptionallyAsync or an exceptionallyCompose method.

A solution not invoking join() would be

public CompletableFuture<Result> executeActionAsync() {
    CompletableFuture<Result> f=executeMycustomActionHere();
    for(int i=0; i<MAX_RETRIES; i++) {
        f=f.thenApply(CompletableFuture::completedFuture)
           .exceptionally(t -> executeMycustomActionHere())
           .thenCompose(Function.identity());
    }
    return f;
}

demonstrating how involved combining “compose” and an “exceptionally” handler is.

Further, only the last exception will be reported, if all retries failed. A better solution should report the first exception, with subsequent exceptions of the retries added as suppressed exceptions. Such a solution can be build by chaining a recursive call, as hinted by Gili’s answer, however, in order to use this idea for exception handling, we have to use the steps to combine “compose” and “exceptionally” shown above:

public CompletableFuture<Result> executeActionAsync() {
    return executeMycustomActionHere()
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(t -> retry(t, 0))
        .thenCompose(Function.identity());
}
private CompletableFuture<Result> retry(Throwable first, int retry) {
    if(retry >= MAX_RETRIES) return CompletableFuture.failedFuture(first);
    return executeMycustomActionHere()
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(t -> { first.addSuppressed(t); return retry(first, retry+1); })
        .thenCompose(Function.identity());
}

CompletableFuture.failedFuture is a Java 9 method, but it would be trivial to add a Java 8 compatible backport to your code if needed:

public static <T> CompletableFuture<T> failedFuture(Throwable t) {
    final CompletableFuture<T> cf = new CompletableFuture<>();
    cf.completeExceptionally(t);
    return cf;
}
查看更多
Lonely孤独者°
7楼-- · 2020-05-19 06:31

Here is an approach that will work for any CompletionStage subclass and does not return a dummy CompletableFuture that does nothing more than wait to get updated by other futures.

/**
 * Sends a request that may run as many times as necessary.
 *
 * @param request  a supplier initiates an HTTP request
 * @param executor the Executor used to run the request
 * @return the server response
 */
public CompletionStage<Response> asyncRequest(Supplier<CompletionStage<Response>> request, Executor executor)
{
    return retry(request, executor, 0);
}

/**
 * Sends a request that may run as many times as necessary.
 *
 * @param request  a supplier initiates an HTTP request
 * @param executor the Executor used to run the request
 * @param tries    the number of times the operation has been retried
 * @return the server response
 */
private CompletionStage<Response> retry(Supplier<CompletionStage<Response>> request, Executor executor, int tries)
{
    if (tries >= MAX_RETRIES)
        throw new CompletionException(new IOException("Request failed after " + MAX_RETRIES + " tries"));
    return request.get().thenComposeAsync(response ->
    {
        if (response.getStatusInfo().getFamily() != Response.Status.Family.SUCCESSFUL)
            return retry(request, executor, tries + 1);
        return CompletableFuture.completedFuture(response);
    }, executor);
}
查看更多
登录 后发表回答