How to use CompletableFuture.thenComposeAsync()?

2019-02-16 23:33发布

Given:

public class Test
{
    public static void main(String[] args)
    {
        int nThreads = 1;
        Executor e = Executors.newFixedThreadPool(nThreads);

        CompletableFuture.runAsync(() ->
        {
            System.out.println("Task 1. Thread: " + Thread.currentThread().getId());
        }, e).thenComposeAsync((Void unused) ->
        {
            return CompletableFuture.runAsync(() ->
            {
                System.out.println("Task 2. Thread: " + Thread.currentThread().getId());
            }, e);
        }, e).join();
        System.out.println("finished");
    }
}

I am expecting a single executor thread to run task 1, followed by task 2. Instead, the code hangs if nThreads is less than 2.

  1. Please explain why the code hangs. I can see it is blocked in CompletableFuture:616 waiting for some Future to complete, but it's not clear why.
  2. If I allow the use of 2 threads, what is each thread being used for?

In short, please help me understand how thenComposeAsync() actually works. The Javadoc looks like it was written for robots instead of human beings :)

2条回答
在下西门庆
2楼-- · 2019-02-16 23:41

It blocks on runAsync that's inside thenComposeAsync. thenComposeAsync runs the supplied function in a thread inside executor e. But the function you gave it tries itself to execute the body of runAsync inside the same executor.

You can see better what's going on by adding another trace output:

CompletableFuture.runAsync(() -> {
    System.out.println("Task 1. Thread: " + Thread.currentThread().getId());
}, e).thenComposeAsync((Void unused) -> {
    System.out.println("Task 1 1/2. Thread: " + Thread.currentThread().getId());
    return CompletableFuture.runAsync(() -> {
        System.out.println("Task 2. Thread: " + Thread.currentThread().getId());
    }, e);
}, e).join();

Now if you run it with a 2-thread executor, you will see that Task 1 1/2 and Task 2 run on different threads.

The way to fix it is to replace thenComposeAsync with just regular thenCompose. I am not sure why you would ever use thenComposeAsync. If you have a method that returns a CompletableFuture, presumably that method doesn't block and doesn't need to be run asynchronously.

查看更多
Deceive 欺骗
3楼-- · 2019-02-16 23:54
  1. The thenComposeAsync method places a new task for your executor that grabs the single thread and waits for your Task 2 to complete. But this one has no more threads to run. You can instead use thenCompose method that executes in the same thread as Task 1 to avoid the deadlock.

  2. One thread is executing Task 1 and Task 2 and the second one is taking care of composing the results of the two.

Note: CompletableFuture(s) work best with a ForkJoinPool that is more efficient in processing tasks that spawn new tasks. The default ForkJoinPool was added in Java 8 for this purpose and is used by default if you don't specify an executor to run your tasks.

Here is a good presentation about where these new features shines and how they work: Reactive Programming Patterns with Java 8 Futures.

查看更多
登录 后发表回答