CompletableFuture | thenApplyAsync vs thenCompose

2020-07-19 23:37发布

问题:

I was trying to understand CompletableFuture, and came across 2 methods, thenApplyAsync and thenCompose. Am trying to understand the difference between these two.

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " Printing hello");
    return "Hello";
}).thenCompose((String s) -> {
  return CompletableFuture.supplyAsync(() -> {
      System.out.println(Thread.currentThread().getName() + " Adding abc");
      return "abc "+s;});
}).thenApplyAsync((String s) -> {
    System.out.println(Thread.currentThread().getName() + " Adding world");
    return s + " World";
}).thenApplyAsync((String s) -> {
    System.out.println(Thread.currentThread().getName() + " Adding name");
    if (false) {
       throw new RuntimeException("Oh no exception");
    }
    return s + " player!";
}).handle((String s, Throwable t) -> {
    System.out.println(s != null ? s : "BLANK");
    System.out.println(t != null ? t.getMessage() : "BLANK Exception");
    return s != null ? s : t.getMessage();
});

How I interpreted these methods as, thenApplyAsync will execute the supplied function in a different thread and return a result, but internally it is wrapped inside a CompletionStage. Whereas, thenCompose will get the return a reference to a CompletionStage.

So under what scenarios, will I use thenCompose over thenApplyAsync?

Went through this link, but it was talking about thenApply, which is somewhat different: CompletableFuture | thenApply vs thenCompose

回答1:

You would use thenCompose when you have an operation that returns a CompletionStage and thenApply when you have an operation that doesn't return a CompletionStage. -> This is was is in thenApply vs thenCompose

However the Async variants of the CompletionStage interface have subtle difference and rare use cases.

Let's take this example into account:

import java.util.concurrent.CompletableFuture;

public class Futures {

    public static void main(String[] args) throws InterruptedException {

        CompletableFuture<Void> c = CompletableFuture.runAsync(() -> {

            System.out.println("run1: " + Thread.currentThread().getId());
        });

        c.whenComplete((r, t) -> {
            System.out.println("I'm completed");
        });

        c.thenCompose(__ -> {
            System.out.println("thenCompose1: " + Thread.currentThread().getId());
            return CompletableFuture.runAsync(() -> {
                System.out.println("run2: " + Thread.currentThread().getId());
            });
        }).thenRunAsync(() -> {
            System.out.println("RunAsync1: " + Thread.currentThread().getId());
        });

        Thread.sleep(5000);

        System.out.println("Finished");
    }
}

Which outputs something like:

run1: 11
thenCompose1: 11
run2: 12
I'm completed
RunAsync1: 11
Finished

Notice that thenApplyAsync doesn't not affect the original future's completed state in contrast with the non async variants which do affect the completed state of the CompletionStage.

Use cases:

thenCompose

You might thenCompose when you have 2 asynchronous operations that need to be executed sequentially:

static CompletionStage<String> insert(Object database) {
    throw new UnsupportedOperationException();
}

static CompletionStage<Object> get(String id) {
    throw new UnsupportedOperationException();
}

public static void main(String[] args) throws InterruptedException {

    Object db = new Object(); // pretend this connects to a database
    CompletionStage<Object> recordInserted = insert(db).thenCompose(id -> get(id));
}

You can only retrieve the record after you inserted it.

Async variants of thenXXXX methods

Imagine for a moment that you have an application that allows users to register themselves and upon registration they will receive a confirmation email to confirm their account.

You don't want the user to be waiting for ever if the mail server is down or if it takes a long time to compose the email or perform additional checks.

You would then use thenApplyAsync to fire off the send email logic because it is not crucial to your system. A user can always go back and say "send me another email"

static CompletionStage<String> register(String username) {
    throw new UnsupportedOperationException();
}

static void sendConfirmationEmail(String username) {
    throw new UnsupportedOperationException();
}

public static void main(String[] args) throws InterruptedException {

    register("user").thenAcceptAsync(username -> sendConfirmationEmail(username));

}

Here your system will respond when the registration is complete but it will not wait for the email to be sent resulting in improved responsiveness of your system.

The uses cases I've found so far for the Async variants are scarce but they do exist.