-->

CompletableFuture is not getting executed. If I us

2020-02-11 06:46发布

问题:

I am trying to run the following class its getting terminated without executing the CompletableFuture.

public class ThenApplyExample {

public static void main(String[] args) throws Exception {
    //ExecutorService es = Executors.newCachedThreadPool();
    CompletableFuture<Student> studentCompletableFuture = CompletableFuture.supplyAsync(() -> {

        try {

            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 3;
    })// If I put executorservice created n commented above, programme work as expected.
            .thenApply(i -> {

                for (int j = 0; j <= i; j++) {
                    System.out.println("Inside first then apply");
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("First then apply is finished");
                return ++i;
            })
            .thenApply(i -> {
                System.out.println("Inside 2nd then apply");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Inside 2nd then apply stopped");

                return i++;
            })
            .thenApply(i -> {
                System.out.println("Inside 3nd then apply");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Inside 3nd then apply stopped");
                return "The i is ::: " + i;
            })
            .thenApply(s -> Student.builder().id(1).name(s).address("Some address").build());
    System.out.println("Executing..");
    System.out.println("Executing..");
    System.out.println("Executing..");
    System.out.println("Executing..");
    System.out.println("Executing..");

    //es.shutdown();
}
} 

Output I am getting is

Executing..
Executing..
Executing..
Executing..
Executing..

Whereas expected output is

Executing..
Executing..
Executing..
Executing..
Executing..
Inside first then apply
Inside first then apply
Inside first then apply
Inside first then apply
First then apply is finished
Inside 2nd then apply
Inside 2nd then apply stopped
Inside 3nd then apply
Inside 3nd then apply stopped

Note : In the above programme, I am not using studentCompletableFuture.get(). I dont want to use it since it blocks the code.

If I add studentCompletableFuture.get() at the very end of the programme, it works as expected or if I add the executorservice in the supplyAsync 2nd argument(check comment in programme), it works again as expected.

My question is why it is terminating when programme uses the default ForkJoin common pool?

回答1:

The threads in the ForkJoinPool are daemon threads (at least by default). Here's the relevant Javadoc (emphasis mine):

A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined. All worker threads are initialized with Thread.isDaemon() set true.

The threads in the ExecutorService created by Executors are non-daemon threads. The exception, I believe, is Executors.newWorkStealingPool. I couldn't find documentation on this but this is how it's currently implemented at least. You can change this by supplying a custom ThreadFactory.

Daemon threads do not keep the JVM alive as stated in Thread's Javadoc (emphasis mine):

When a Java Virtual Machine starts up, there is usually a single non-daemon thread (which typically calls the method named main of some designated class). The Java Virtual Machine continues to execute threads until either of the following occurs:

  • The exit method of class Runtime has been called and the security manager has permitted the exit operation to take place.
  • All threads that are not daemon threads have died, either by returning from the call to the run method or by throwing an exception that propagates beyond the run method.

In your code you have the following (simplified):

public static void main(String[] args) {
    CompletableFuture<Student> future = CompletableFuture.supplyAsync(/* Supplier */)
            .andThen(/* Function One */)
            .andThen(/* Function Two */)
            .andThen(/* Function Three */)
            .andThen(/* Final Function */);
}

This uses the common ForkJoinPool which, as stated, uses daemon threads. The async code is launched from the main thread but you don't wait for it to complete. This means the main thread exits and therefore the JVM also exists. This happens before your async code had a chance to complete.

Then you tried a call to get():

public static void main(String[] args) throws Exception {
    Student student = CompletableFuture.supplyAsync(/* Supplier */)
            .andThen(/* Function One */)
            .andThen(/* Function Two */)
            .andThen(/* Function Three */)
            .andThen(/* Final Function */)
            .get();
}

This works because get() is a blocking call; meaning the main thread now waits until the async code completes before continuing. In other words, you are keeping the main thread alive which keeps the JVM alive.

When you use your own ExeuctorService from Executors.newCachedThreadPool() the executing threads are non-daemon. This means the async code is being run on these non-daemon threads which keeps the JVM alive until said code completes. In fact, it would keep the JVM alive even after the async code completes if you don't call ExecutorService.shutdown() (though a cached thread pool might allow all threads to die after a certain time).

In the question comments you ask if there is a more "elegant" way to keep the main thread alive (other than get()). I'm not sure what your definition of "elegant" is but there is the CompletableFuture.join() method. It waits, like get(), for the future to complete (normally or exceptionally) before it returns. Unlike get(), however, it doesn't throw checked exceptions; but the wait also cannot be interrupted.


You also state,

I just checked ForkJoinPool common threads are not daemon thread by using System.out.println("The thread is :: "+Thread.currentThread().getName() + Thread.currentThread().isDaemon());

I don't know why that's the case. Running this code:

public static void main(String[] args) {
    CompletableFuture.runAsync(() -> {
        Thread t = Thread.currentThread();
        System.out.printf("Thread_Name: %s, Daemon: %s%n", t.getName(), t.isDaemon());
    }).join();
}

Gives me this:

Thread_Name: ForkJoinPool.commonPool-worker-9, Daemon: true