Why does stream parallel() not use all available t

2020-07-26 15:37发布

I tried to run 100 Sleep tasks in parallel using Java8(1.8.0_172) stream.parallel() submitted inside a custom ForkJoinPool with 100+ threads available. Each task would sleep for 1s. I expected the whole work would finish after ~1s, given the 100 sleeps could be done in parallel. However I observe a runtime of 7s.

    @Test
    public void testParallelStream() throws Exception {
        final int REQUESTS = 100;
        ForkJoinPool forkJoinPool = null;
        try {
            // new ForkJoinPool(256): same results for all tried values of REQUESTS
            forkJoinPool = new ForkJoinPool(REQUESTS);
            forkJoinPool.submit(() -> {

                IntStream stream = IntStream.range(0, REQUESTS);
                final List<String> result = stream.parallel().mapToObj(i -> {
                    try {
                        System.out.println("request " + i);
                        Thread.sleep(1000);
                        return Integer.toString(i);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }).collect(Collectors.toList());
                // assertThat(result).hasSize(REQUESTS);
            }).join();
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown();
            }
        }
    }

With output indicating ~16 stream elements are executed before a pause of 1s, then another ~16 and so on. So it seems even though the forkjoinpool was created with 100 threads, only ~16 get used.

This pattern emerges as soon as I use more than 23 threads:

1-23 threads: ~1s
24-35 threads: ~2s
36-48 threads: ~3s
...
System.out.println(Runtime.getRuntime().availableProcessors());
// Output: 4

2条回答
冷血范
2楼-- · 2020-07-26 16:20

As you wrote it, you let the stream decide the parallelism of the executions.

There you have the effect that ArrayList.parallelStream tries to outsmart you by splitting the data up evenly, without taking the number of available threads into account. This is good for CPU-Bound operations, where it's not usefull to have more threads than CPU Cores, but is not made for processes that need to wait for IO.

Why not force-feed all your items sequentially to the ForkJoinPool, so it's forced to use all available threads?

        IntStream stream = IntStream.range(0, REQUESTS);
        List<ForkJoinTask<String>> results
                = stream.mapToObj(i -> forkJoinPool.submit(() -> {

            try {
                System.out.println("request " + i);
                Thread.sleep(1000);
                return Integer.toString(i);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        })).collect(Collectors.toList());
        results.forEach(ForkJoinTask::join);

This takes less than two seconds on my machine.

查看更多
等我变得足够好
3楼-- · 2020-07-26 16:36

Since the Stream implementation’s use of the Fork/Join pool is an implementation detail, the trick to force it to use a different Fork/Join pool is undocumented as well and seems to work by accident, i.e. there’s a hardcoded constant determining the actual parallelism, depending on the default pool’s parallelism. So using a different pool was not foreseen, originally.

However, it has been recognized that using a different pool with an inappropriate target parallelism is a bug, even if this trick is not documented, see JDK-8190974.

It has been fixed in Java 10 and backported to Java 8, update 222.

So a simple solution world be updating the Java version.

You may also change the default pool’s parallelism, e.g.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");

before doing any Fork/Join activity.

But this may have unintended effects on other parallel operations.

查看更多
登录 后发表回答