stream and parallelStream

2019-02-17 05:41发布

问题:

I have a test code like this :

List<Integer> list = new ArrayList<>(1000000);

for(int i=0;i<1000000;i++){
    list.add(i);
}

List<String> values = new ArrayList<>(1000000);

list.stream().forEach(
    i->values.add(new Date().toString())
);

System.out.println(values.size()); 

Running this, I got a correct output: 1000000.

However, if I change the stream() to parallelStream(), as this:

 list.parallelStream().forEach(
    i->values.add(new Date().toString())
 );

I got a random output, e.g.: 920821.

What's wrong?

回答1:

An ArrayList is not synchronized. Trying to concurrently add elements to it is not defined. From forEach:

For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever thread the library chooses.

In your second example, you end up with multiple threads calling add on the array list at the same time and ArrayList documentation says:

Note that this implementation is not synchronized. If multiple threads access an ArrayList instance concurrently, and at least one of the threads modifies the list structurally, it must be synchronized externally.

The wrong solution

If you change the use of an ArrayList to a Vector, you'll get the correct result, because this list implementation is synchronized. Its Javadoc says:

Unlike the new collection implementations, Vector is synchronized.

However, do not use it! Furthermore, it might end up being slower because of the explicit synchronization.

The correct approach

It is explicitly to avoid this situation that the Stream API provides the mutable reduction paradigm, with the use of the collect method. The following

List<String> values = list.stream().map(i -> "foo").collect(Collectors.toList());

will always provide the correct result, whether run in parallel or not. The Stream pipeline internally handles the concurrency and guarantees that it is safe to use a non-concurrent collector in a collect operation of a parallel stream. Collectors.toList() is a built-in collector accumulating the elements of a Stream into a list.



回答2:

Using a Consumer, you have to worry about thread safety. A simpler solution it to let the Stream API accumulate the results.

List<String> values = IntStream.range(0, 1_000_000).parallel()
                               .mapToObj(i -> new Date().toString())
                               .collect(Collectors.toList());

A key reason to avoid using a thread safe collector like Vector is it requires each thread to obtain a shared lock with is a bottleneck, i.e. you will spend time obtaining and releasing the lock and only one thread at a time can access it. You can easily end up with a solution which is slower than using one thread alone.



回答3:

values.add(String) is not thread safe. When you invoke this method from different threads without synchronization it is no guarantee that it will work as expected.

To fix that you can:

  • use thread-safe collection like Vector or CopyOnWriteArrayLis.
  • Explicitly synchronize your code. For example put synchronize(this){values.add(new Date().toString())} into your code. Note i-> is outside synchronize block
  • Or in this case map elments to get new stream like in @PeterLawrey answer: IntStream.range(0, 1_000_000).parallel().mapToObj(i -> new Date().toString()).collect(Collectors.toList());