JDK8 CompletableFuture.supplyAsync how to deal wit

2020-02-10 14:50发布

问题:

CompletableFuture.supplyAsync(
() -> {
    transporter.write(req);
    //here take the value from a blocking queue,will throw a interruptedException
    return responseQueue.take();
},  executorService);

The common method to deal with interruptedException is either to interrupt again or direct throw interruptedException, but both cannot work. Anyone have the idea?

回答1:

I change the code like this.

    CompletableFuture<Rep> result = new CompletableFuture<>();
    CompletableFuture.runAsync(() -> {

        transporter.write(req);
        try {
            Rep rep = responseQueue.take();
            result.complete(rep);
        } catch (InterruptedException e) {
            result.completeExceptionally(e);
            Thread.currentThread().interrupt();
        } catch (Exception e) {
            result.completeExceptionally(e);
        }

    }, executorService);
    return result;


回答2:

I ran into the same question, but after reading more from comments here and reference book I think you can do either one of these two:

1 (what I end up doing):

CompletableFuture.runAsync(() -> {
    transporter.write(req);
    try {
        Rep rep = responseQueue.take();
        result.complete(rep);
    } catch (Exception e) {
        throw new CompletionException(e);
    }
}, executorService);
return result;

or 2:

CompletableFuture<Rep> result = new CompletableFuture<>();
new Thread(()-> {
    transporter.write(req);
    try {
        Rep rep = responseQueue.take();
        result.complete(rep);
    } catch (Exception e) {
        retsult.completeExceptionally(e);
    }
}).start();

I know the 2nd one does not use the executorService, but I feel the whole point of using CompletableFuture is utilizing the CompletionStage APIs in functional-style.



回答3:

As lambda functions don't support throwing exceptions, I think Java developers will need a new paradigm. One thing that comes to mind is as follows:

public class ResultWrapper<R, E extends Exception> {
    E exception;
    R result;
}

Lambda functions can return instances of this wrapper. (Edit: your case)

CompletableFuture<ResultWrapper<String, InterruptedException>> aFuture = ...;
...
aFuture.supplyAsync(
() -> {
    try {
        transporter.write(req);
    } catch(InterruptedException e) {
        ResultWrapper<String, InterruptedException> r = new ResultWrapper<>();
        r.exception = e;
        r.result = null;
        return r;
    }
    ...
},  executorService);


回答4:

@antak mentioned it buried in a comment, but I think the correct answer here is:

For CompletableFuture.supplyAsync() wrap it in java.util.concurrent.CompletionException and rethrow it.

So the sample code would look something like:

CompletableFuture.supplyAsync(
    () -> {
        transporter.write(req);
        try {
            //here take the value from a blocking queue,will throw a interruptedException
            return responseQueue.take();
        }
        catch (InterruptedException e) {
            throw new CompletionException(e);
        }
    },  executorService);