Experimenting with streams I ran into the following behavior which I don't quite understand. I created a parallel stream from an iterator and I noticed that it did not seem to be exhibiting parallelism. In the below example I've printed a counter to the console for two parallel streams, one created from an iterator and the other from a list. The stream created from the list exhibited the behavior I expected which was to print the counter in non-sequential order but the stream created from the iterator printed the counter in sequential order. Am I creating the parallel stream from an iterator incorrectly?
private static int counter = 0;
public static void main(String[] args) {
List<Integer> lstr = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
Iterator<Integer> iter = lstr.iterator();
System.out.println("Iterator Stream: ");
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, Spliterator.IMMUTABLE | Spliterator.CONCURRENT), true).forEach(i -> {
System.out.print(counter + " ");
counter++;
});
counter = 0;
System.out.println("\nList Stream: ");
lstr.parallelStream().forEach(i -> {
System.out.print(counter + " ");
counter++;
});
}
There is no guaranty that parallel processing prints the counter in non-sequential order. Further, since you’re updating a variable without synchronization, it’s possible to miss updates made by other threads, so the results may be entirely inconsistent.
Besides that, an Iterator
has to be polled sequentially, so to get at least some gain from parallel processing, elements have to be buffered, but without a known size, there is no good estimate on how many elements to buffer. The default strategy uses more than thousand elements and does not split the work well.
So if you use more than thousand elements you might notice more parallel activity. Alternatively, you may specify a size using StreamSupport.stream(Spliterators.spliterator(iter, lstr.size(), 0), true)
to construct the stream. Then, the internally used buffering will be adapted.
Still, the List
’s stream will have a more efficient parallel processing, as it not only knows its size but supports splitting the workload utilizing the random access nature of the underlying data structure.
The current implementation will try to parallelize a stream produced form an iterator by buffering up the values and dispatching them to several threads but it only kicks in if the stream is long enough. Increase your list to 10000 elements and you should see parallelism.
With a large list, it might be easier to see how elements distributed by thread if you collect into a map grouped by thread. Replace your .forEach
with .collect(Collectors.groupingBy(x -> Thread.currentThread().getName(), Collectors.counting()))