CompletableFuture — Aggregate Future to Fail Fast

2020-07-10 11:09发布

问题:

I have been using the CompletableFuture.allOf(...) helper to create aggregate futures that will only become "done" when their composite futures are marked as complete, i.e:

CompletableFuture<?> future1 = new CompletableFuture<>();
CompletableFuture<?> future2 = new CompletableFuture<>();
CompletableFuture<?> future3 = new CompletableFuture<>();

CompletableFuture<?> future = CompletableFuture.allOf(future1, future2, future3);

I would like a slight variation on this functionality, where the aggregate future is market as complete when:

  • All futures have completed successfully OR
  • Any one future has completed unsuccessfuly

In the latter case, the aggregate future should complete (exceptionally) immediately, and not have to wait for the other futures to complete, i.e. to fail-fast.

To illustrate this in contrast to CompletableFuture.allOf(...) consider this:

// First future completed, gotta wait for the rest of them...
future1.complete(null);
System.out.println("Future1 Complete, aggregate status: " + future.isDone());

// Second feature was erroneous! I'd like the aggregate to now be completed with failure
future2.completeExceptionally(new Exception());
System.out.println("Future2 Complete, aggregate status: " + future.isDone());

// Finally complete the third future, that will mark the aggregate as done
future3.complete(null);
System.out.println("Future3 Complete, aggregate status: " + future.isDone());

Using allOf(...), this code yields:

Future1 Complete, aggregate status: false
Future2 Complete, aggregate status: false
Future3 Complete, aggregate status: true

Whereas my alternative aggregate implementation would return "true" after Feature2 was completed, given it was an exceptional.


I cannot find any utils in the Java standard library that will help me achieve this, which feels strange... given it's a relatively vanilla use-case.

Looking at the implementation of CompletableFuture.allOf(...) it's fairly obvious that the logic behind these scenarios is fairly complex. I'd loathe to have to write this myself, I was wondering if there are any alternatives?

回答1:

Although not as syntactically sweet as the CompletableFuture.allOf(...) method, it appears that thenCompose(...) may offer the solution:

CompletableFuture<?> future = future1.thenCompose((f) -> future2).thenCompose((f) -> future3);

This will yield the desired:

Future1 Complete, aggregate status: false
Future2 Complete, aggregate status: true
Future3 Complete, aggregate status: true

This could be wrapped up in a helper method which would offer some syntactic niceties to the caller:

private static CompletableFuture<?> composed(CompletableFuture<?> ... futures) {

    // Complete when ALL the underlying futures are completed
    CompletableFuture<?> allComplete = CompletableFuture.allOf(futures);

    // Complete when ANY of the underlying futures are exceptional
    CompletableFuture<?> anyException = new CompletableFuture<>();
    for (CompletableFuture<?> completableFuture : futures) {
        completableFuture.exceptionally((t) -> {
            anyException.completeExceptionally(t);
            return null;
        });
    }

    // Complete when either of the above are satisfied
    return CompletableFuture.anyOf(allComplete, anyException);
}

Allowing for:

CompletableFuture<?> future = composed(future1, future2, future3);


回答2:

You could maybe do this by creating both an allOf and an anyOf, and then combining those into a second anyOf.