Java8 CompletableFuture recoverWith equivalent? eg

2019-01-15 09:17发布

问题:

I don't see an obvious way to handle an exception with an asynchronous result. Eg, if I want to Retry an async operation. I would expect something like this, however handleAsync doesn't do what you think it does - it runs the callbacks on another thread asynchronously. Returning a CompletionStage here is not correct. Jeopardy question of the day: thenApply is to exceptionally as thenCompose is to what.

    CompletionStage<String> cf = askPong("cause error").handleAsync((x, t) -> {
        if (t != null) {
            return askPong("Ping");
        } else {
            return x;
        }
    });

Where askPong asks an actor:

public CompletionStage<String> askPong(String message){
    Future sFuture = ask(actorRef, message, 1000);
    final CompletionStage<String> cs = toJava(sFuture);
    return cs;
} 

回答1:

Is this what you are looking for?

askPong("cause error")
        .handle( (pong, ex) -> ex == null 
                ? CompletableFuture.completedFuture(pong) 
                : askPong("Ping")
        ).thenCompose(x -> x);

Also, do not use the ...Async methods unless you intend for the body of the supplied function to be executed asynchronously. So when you do something like

.handleAsync((x, t) -> {
    if (t != null) {
        return askPong("Ping");
    } else {
        return x;
    })

You are asking for the if-then-else to be run in a separate thread. Since askPong returns a CompletableFuture, there's probably no reason to run it asynchronously.



回答2:

After a lot of frustration in trying to figure out the proper way of doing Scala's recoverWith in Java 8, I ended up just writing my own. I still don't know if this is the best approach, but I created something like:

public RecoveryChainAsync<T> recoverWith(Function<Throwable,
                                         CompletableFuture<T>> fn);

With repeated calls to recoverWith, I queue up the functions inside the recovery chain and implement the recovery flow myself with "handle". RecoveryChainAsync.getCompletableFuture() then returns a representative CompletableFuture for the entire chain. Hope this helps.