CompletableFuture | thenApply vs thenCompose

2019-01-21 10:03发布

问题:

I can't get my head around the difference between thenApply() and thenCompose().

So, could someone provide a valid use case?

From the Java docs:

thenApply(Function<? super T,? extends U> fn)

Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function.

thenCompose(Function<? super T,? extends CompletionStage<U>> fn)

Returns a new CompletionStage that, when this stage completes normally, is executed with this stage as the argument to the supplied function.

I get that the 2nd argument of thenCompose extends the CompletionStage where thenApply does not.

Could someone provide an example in which case I have to use thenApply and when thenCompose?

回答1:

thenApply is used if you have a synchronous mapping function.

CompletableFuture<Integer> future = 
    CompletableFuture.supplyAsync(() -> 1)
                     .thenApply(x -> x+1);

thenCompose is used if you have an asynchronous mapping function (i.e. one that returns a CompletableFuture). It will then return a future with the result directly, rather than a nested future.

CompletableFuture<Integer> future = 
    CompletableFuture.supplyAsync(() -> 1)
                     .thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1));


回答2:

The updated Javadocs in Java 9 will probably help understand it better:

thenApply

<U> CompletionStage<U> thenApply​(Function<? super T,? extends U> fn)

Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function.

This method is analogous to Optional.map and Stream.map.

See the CompletionStage documentation for rules covering exceptional completion.

thenCompose

<U> CompletionStage<U> thenCompose​(Function<? super T,? extends CompletionStage<U>> fn)

Returns a new CompletionStage that is completed with the same value as the CompletionStage returned by the given function.

When this stage completes normally, the given function is invoked with this stage's result as the argument, returning another CompletionStage. When that stage completes normally, the CompletionStage returned by this method is completed with the same value.

To ensure progress, the supplied function must arrange eventual completion of its result.

This method is analogous to Optional.flatMap and Stream.flatMap.

See the CompletionStage documentation for rules covering exceptional completion.



回答3:

I think the answered posted by @Joe C is misleading.

Let me try to explain the difference between thenApply and thenCompose with an example.

Let's suppose that we have 2 methods: getUserInfo(int userId) and getUserRating(UserInfo userInfo):

public CompletableFuture<UserInfo> userInfo = getUserInfo(userId)

public CompletableFuture<UserRating> getUserRating(UserInfo)

Both method return types are CompletableFuture.

We want to call getUserInfo() first, and on its completion, call getUserRating() with the resulting UserInfo.

On the completion of getUserInfo() method, let's try both thenApply and thenCompose. The difference is in the return types:

CompletableFuture<CompletableFuture<UserRating>> f =
    userInfo.thenApply(this::getUserRating);

CompletableFuture<UserRating> relevanceFuture =
    userInfo.thenCompose(this::getUserRating);

thenCompose() works like Scala's flatMap which flattens nested futures.

thenApply() returned the nested futures as they were, but thenCompose() flattened the nested CompletableFutures so that it is easier to chain more method calls to it.



回答4:

thenApply and thenCompose is called on a CompletableFuture and does something with its result by supplying a Function. thenApply and thenCompose both return a CompletableFuture as their own result, such that you can chain multiple thenApply or thenCompose, each one would have a Function doing something to the result of the last Function.

This Function sometimes needs to do something synchronously, and return a result, in which case thenApply should be used.

CompletableFuture.completedFuture(1)
    .thenApply((x)->x+1) // adding one to the result synchronously
    .thenApply((x)->System.println(x));

You may also do something asynchronous in this Function, and this asynchronous thing you do should return a CompletionStage. The next Function in the chain is not interested in getting a CompletionStage as input, but rather the result of that CompletionStage. So then you should use thenCompose.

// addOneAsync may be implemented by using another thread, or calling a remote method
// CompletableFuture<Integer> addOneAsync(int input);
CompletableFuture.completedFuture(1)
    .thenCompose((x)->addOneAsync(x)) // doing something asynchronous
    .thenApply((x)->System.println(x));

In Javascript, Promise.then can accept a function, that either returns a value or a Promise of a value. In Java because of type rules, the two functions have to be distinctly typed, ie. (Function<? super T,? extends U> fn and Function<? super T,? extends CompletionStage<U>> fn. (or that Java has to do some type checking to do something special if you return a CompletionStage, but they chose the former) The end result being, Promise.then is implemented in two parts thenApply and thenCompose.

You can also read my answer about thenApplyAsync if that is confusing to you.