How can I asynchronously execute 20 Runnable tasks(or 1 task 20 times), using 5 CompletableFutures?
That's what I've got:
Runnable task = () -> {
long startTime = System.currentTimeMillis();
Random random = new Random();
while (System.currentTimeMillis() - startTime < 3000) {
DoubleStream.generate(() -> random.nextDouble())
.limit(random.nextInt(100))
.map(n -> Math.cos(n))
.sum();
}
System.out.println("Done");
};
for (int i = 0; i < 4; i++) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future3 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future4 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future5 = CompletableFuture.runAsync(task);
future1.get();
future2.get();
future3.get();
future4.get();
future5.get();
}
If I execute this code, I can see that it only runs 3 future.get() asynchronously:
3 and then 2 that's left during 1 for() iteration
So, I would like to do all 20 tasks, as asynchronously as possible
You can use allOf to run several tasks simultaneously as one. First I create a combined of 5 tasks (the same as in your question) but then I added 10 instead (and only loped twice) and got half the execution time.
for (int i = 0; i < 2; i++) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
// and so on until ten
CompletableFuture<Void> future10 = CompletableFuture.runAsync(task);
CompletableFuture<Void> combined = CompletableFuture.allOf(future1, future2, future3, future4, future5, future6, future7, future8, future9, future10);
combined.get();
}
The default executor of CompletableFuture
is the common pool of the ForkJoinPool
, which has a default target parallelism matching the number of CPU cores minus one. So if you have four cores, at most three jobs will get executed asynchronously. Since you are forcing a wait for completion every 5 jobs, you’ll get three parallel executions, followed by two parallel executions in every loop iteration.
If you want to get a particular execution strategy like parallelism of your choice, the best way is to specify a properly configured executor. Then, you should let the executor manage the parallelism instead of waiting in a loop.
ExecutorService pool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 20; i++) {
CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks
This allows five parallel jobs, but will let each of the five threads pick up a new job immediately after one completed, instead of waiting for the next loop iteration.
But when you say
So, I would like to do all 20 tasks, as asynchronously as possible
it’s not clear why you are enforcing a wait after scheduling five jobs at all. The maximum parallelism can be achieve via
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks
This may spawn as many threads as jobs, unless one job completes before all have been scheduled, as in this case the worker thread may pick up a new job.
But this logic doesn’t require a CompletableFuture
at all. You can also use:
ExecutorService pool = Executors.newCachedThreadPool();
// schedule 20 jobs and return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();
But when your job does not involve I/O or any other kind of waiting resp. releasing the CPU, there is no point in creating more threads than CPU cores. A pool configured to the number of processors is preferable.
ExecutorService pool = Executors.newWorkStealingPool(
Runtime.getRuntime().availableProcessors());
// schedule 20 jobs at return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();
In your special case this likely runs slower as your jobs use the system time to appear running faster when having more threads than cores, but are actually doing less work then. But for ordinary computational task, this will improve the performance.
Set the following system property to the number of threads you want the common fork join pool to use:
java.util.concurrent.ForkJoinPool.common.parallelism
See ForkJoinPool
The reason being that you do not specify your own fork join pool when constructing your completable futures, so it implicitly uses
ForkJoinPool.commonPool()
See CompletableFurure