I have the following code (resulting from my previous question) that schedules a task on a remote server, and then polls for completion using ScheduledExecutorService#scheduleAtFixedRate
. Once the task is complete, it downloads the result.
I want to return a Future
to the caller so they can decide when and how long to block, and give them the option to cancel the task.
My problem is that if the client cancels the Future
returned by the download
method, whenComplete
block doesn't execute. If I remove thenApply
it does. It's obvious I'm misunderstanding something about Future
composition... What should I change?
public Future<Object> download(Something something) {
String jobId = schedule(something);
CompletableFuture<String> job = pollForCompletion(jobId);
return job.thenApply(this::downloadResult);
}
private CompletableFuture<String> pollForCompletion(String jobId) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
CompletableFuture<String> completionFuture = new CompletableFuture<>();
ScheduledFuture<?> checkFuture = executor.scheduleAtFixedRate(() -> {
if (pollRemoteServer(jobId).equals("COMPLETE")) {
completionFuture.complete(jobId);
}
}, 0, 10, TimeUnit.SECONDS);
completionFuture
.whenComplete((result, thrown) -> {
System.out.println("XXXXXXXXXXX"); //Never happens unless thenApply is removed
checkFuture.cancel(true);
executor.shutdown();
});
return completionFuture;
}
On the same note, if I do:
return completionFuture.whenComplete(...)
instead of
completionFuture.whenComplete(...);
return completionFuture;
whenComplete
also never executes. This seems very counterintuitive to me. Shouldn't logically the Future
returned by whenComplete
be the one I should hold on to?
EDIT:
I changed my code to explicitly back-propagate the cancellation. It's abhorrent and unreadable, but it works and I couldn't find a better way:
public Future<Object> download(Something something) throws ChartDataGenException, Exception {
String jobId = schedule(something);
CompletableFuture<String> job = pollForCompletion(jobId);
CompletableFuture<Object> resulting = job.thenApply(this::download);
resulting.whenComplete((result, thrown) -> {
if (resulting.isCancelled()) { //the check is not necessary, but communicates the intent better
job.cancel(true);
}
});
return resulting;
}
EDIT 2:
I've discovered tascalate-concurrent, a wonderful library providing a sane implementation of CompletionStage
, with support for dependent promises (via the DependentPromise
class) that can transparently back-propagate cancellations. Seems perfect for this use-case.
This should be enough:
DependentPromise
.from(pollForCompletion(jobId))
.thenApply(this::download, true); //true means the cancellation should back-propagate
Didn't test this approach, mind you.