I have a test code like this :
List<Integer> list = new ArrayList<>(1000000);
for(int i=0;i<1000000;i++){
list.add(i);
}
List<String> values = new ArrayList<>(1000000);
list.stream().forEach(
i->values.add(new Date().toString())
);
System.out.println(values.size());
Running this, I got a correct output: 1000000.
However, if I change the stream()
to parallelStream()
, as this:
list.parallelStream().forEach(
i->values.add(new Date().toString())
);
I got a random output, e.g.: 920821.
What's wrong?
An ArrayList
is not synchronized. Trying to concurrently add elements to it is not defined. From forEach
:
For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever thread the library chooses.
In your second example, you end up with multiple threads calling add
on the array list at the same time and ArrayList
documentation says:
Note that this implementation is not synchronized. If multiple threads access an ArrayList instance concurrently, and at least one of the threads modifies the list structurally, it must be synchronized externally.
The wrong solution
If you change the use of an ArrayList
to a Vector
, you'll get the correct result, because this list implementation is synchronized. Its Javadoc says:
Unlike the new collection implementations, Vector
is synchronized.
However, do not use it! Furthermore, it might end up being slower because of the explicit synchronization.
The correct approach
It is explicitly to avoid this situation that the Stream API provides the mutable reduction paradigm, with the use of the collect
method. The following
List<String> values = list.stream().map(i -> "foo").collect(Collectors.toList());
will always provide the correct result, whether run in parallel or not. The Stream pipeline internally handles the concurrency and guarantees that it is safe to use a non-concurrent collector in a collect operation of a parallel stream. Collectors.toList()
is a built-in collector accumulating the elements of a Stream into a list.
Using a Consumer, you have to worry about thread safety. A simpler solution it to let the Stream API accumulate the results.
List<String> values = IntStream.range(0, 1_000_000).parallel()
.mapToObj(i -> new Date().toString())
.collect(Collectors.toList());
A key reason to avoid using a thread safe collector like Vector is it requires each thread to obtain a shared lock with is a bottleneck, i.e. you will spend time obtaining and releasing the lock and only one thread at a time can access it. You can easily end up with a solution which is slower than using one thread alone.
values.add(String)
is not thread safe. When you invoke this method from different threads without synchronization it is no guarantee that it will work as expected.
To fix that you can:
- use thread-safe collection like
Vector
or CopyOnWriteArrayLis
.
- Explicitly synchronize your code. For example put
synchronize(this){values.add(new Date().toString())}
into your code. Note i->
is outside synchronize block
- Or in this case map elments to get new stream like in @PeterLawrey answer:
IntStream.range(0, 1_000_000).parallel().mapToObj(i -> new Date().toString()).collect(Collectors.toList());