Return a CompletableFuture without exposing execut

2020-07-24 06:33发布

问题:

I expose a method in a library, which returns a CompletableFuture. That method's computation happens on a single-threaded Executor which is my bottleneck, hence I do not want any subsequent work to happen on the same thread.

If I use the simple approach of returning the result of "supplyAsync", I'll be exposing my precious thread to the callers, who may be adding synchronous operations (e.g. via thenAccept) which could take some CPU time on that thread.

Repro below:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CfPlayground {
    private ExecutorService preciousExecService = Executors.newFixedThreadPool(1);

    CfPlayground() {}

    private static void log(String msg) {
        System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
    }

    CompletableFuture<String> asyncOp(String param) {
        return CompletableFuture.supplyAsync(() -> {
            log("In asyncOp");
            return "Hello " + param;
        }, preciousExecService);
    }

    void syncOp(String salutation) {
        log("In syncOp: " + salutation);
    }

    void run() {
        log("run");
        asyncOp("world").thenAccept(this::syncOp);
    }

    public static void main(String[] args) throws InterruptedException {
        CfPlayground compFuture = new CfPlayground();
        compFuture.run();
        Thread.sleep(500);
        compFuture.preciousExecService.shutdown();
    }
}

This indeed prints:

[main] run
[pool-1-thread-1] In asyncOp
[pool-1-thread-1] In syncOp: Hello world

One solution I found was to introduce another Executor, and add a no-op thenApplyAsync with that executor before returning the CompletableFuture

    CompletableFuture<String> asyncOp(String param) {
        return CompletableFuture.supplyAsync(() -> {
            log("In asyncOp");
            return "Hello " + param;
        }, preciousExecService).thenApplyAsync(s -> s, secondExecService);
    }

This works, but doesn't feel super elegant - is there a better way to do this?

回答1:

There is no feature to detach your completion from the execution of the dependent action. When the thread chaining the dependent action has already completed the registration and your executor’s thread completes the future, which other thread ought to execute the dependent action if no other executor was given?

Your approach of chaining another action with a different executor seems to be the best you can get. However, it’s important to note that in case of an exceptional completion, the exception gets propagated without evaluating functions passed to thenApply. This exception propagation could again lead to an exposure of the thread, if the caller chained an action like whenComplete, handle, or exceptionally.

On the other hand, you don’t need to specify a secondary executor, as you can use the async method without executor parameter, to get the default (common Fork/Join) pool.

So chaining .whenCompleteAsync((x,y) -> {}) is the best solution to your problem so far.



回答2:

You could just change the method signature to return a Future instead of a CompletableFuture:

Future<String> asyncOp(String param) {
    return CompletableFuture.supplyAsync(() -> {
        log("In asyncOp");
        return "Hello " + param;
    }, preciousExecService);
}

That way, the run() method would throw a compilation error:

void run() {
    log("run");
    asyncOp("world").thenAccept(this::syncOp);
}

The caller would still be able to cast the returned Future back to a CompletableFuture, but that would be quite a misuse of your API and it cannot happen by accident.