This code is a quoted from Java 8 in Action, which is also in the book 11.4.3.
public Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}
Along the code, the writer enclose a figure as follows expressing that the applyDiscount()
works in the same thread with getPrice()
, which I strongly have a doubt: there are two different Async suffix here which means the second call should be in another thread.
I tested it locally with the following code:
private static void testBasic() {
out.println("*****************************************");
out.println("********** TESTING thenCompose **********");
CompletableFuture[] futures = IntStream.rangeClosed(0, LEN).boxed()
.map(i -> CompletableFuture.supplyAsync(() -> runStage1(i), EXECUTOR_SERVICE))
.map(future -> future.thenCompose(i -> CompletableFuture.supplyAsync(() -> runStage2(i), EXECUTOR_SERVICE)))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
}
The output further demonstrate my thought, is it correct?
*****************************************
********** TESTING thenCompose **********
Start: stage - 1 - value: 0 - thread name: pool-1-thread-1
Start: stage - 1 - value: 1 - thread name: pool-1-thread-2
Start: stage - 1 - value: 2 - thread name: pool-1-thread-3
Start: stage - 1 - value: 3 - thread name: pool-1-thread-4
Finish: stage - 1 - value: 3 - thread name: pool-1-thread-4 - time cost: 1520
Start: stage - 2 - value: 3 - thread name: pool-1-thread-5
Finish: stage - 1 - value: 0 - thread name: pool-1-thread-1 - time cost: 1736
Start: stage - 2 - value: 0 - thread name: pool-1-thread-6
Finish: stage - 1 - value: 2 - thread name: pool-1-thread-3 - time cost: 1761
Start: stage - 2 - value: 2 - thread name: pool-1-thread-7
Finish: stage - 2 - value: 2 - thread name: pool-1-thread-7 - time cost: 446
Finish: stage - 1 - value: 1 - thread name: pool-1-thread-2 - time cost: 2249
Start: stage - 2 - value: 1 - thread name: pool-1-thread-8
Finish: stage - 2 - value: 3 - thread name: pool-1-thread-5 - time cost: 828
Finish: stage - 2 - value: 0 - thread name: pool-1-thread-6 - time cost: 704
Finish: stage - 2 - value: 1 - thread name: pool-1-thread-8 - time cost: 401
The Java 8 in Action
is wrong about this?
Thank you, @Holger. You make it crystal clear to me now about the executing thread for async and non-async methods. Especially after checking its specification further demonstrating your point.
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
As a first note, that code is distracting from what’s happening due to the unnecessary splitting into multiple Stream operations.
Further, there is no sense in doing
instead of
So, a simpler example doing the same would be
However, you are right, there is no guaranty that
getPrice
andapplyDiscount
run in the same thread—unless the executor is a single threaded executor.You may interpret “executor thread” as “one of the executor’s threads”, but even then, there in a dangerously wrong point in the diagram, namely, “
new Quote(price)
”, which apparently actually means “Quote::parse
”. That step does not belong to the right side, as the actual thread evaluating the function passed tothenApply
is unspecified. It may be one of the executor’s threads upon completion of the previous stage, but it may also be “your thread” right when callingthenApply
, e.g. if the asynchronous operation managed to complete in‑between.The
CompletableFuture
offers no way to enforce the use of the first stage’s completing thread for the dependent actions.Unless you use a simple sequential code instead, of course:
Then, the picture of a linear thread on the right hand side will be correct.