Inspired by this question, I started to play with ordered vs unordered streams, parallel vs sequential streams and terminal operations that respect encounter order vs terminal operations that don't respect it.
In one answer to the linked question, a code similar to this one is shown:
List<Integer> ordered = Arrays.asList(
1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4);
List<Integer> result = new CopyOnWriteArrayList<>();
ordered.parallelStream().forEach(result::add);
System.out.println(ordered);
System.out.println(result);
And the lists are indeed different. The unordered
list even changes from one run to another, showing that the result is actually non-deterministic.
So I created this other example:
CopyOnWriteArrayList<Integer> result2 = ordered.parallelStream()
.unordered()
.collect(Collectors.toCollection(CopyOnWriteArrayList::new));
System.out.println(ordered);
System.out.println(result2);
And I expected to see similar results, as the stream is both parallel and unordered (maybe unordered()
is redundant, since it's already parallel). However, the resulting list is ordered, i.e. it's equal to the source list.
So my question is why the collected list is ordered? Does collect
always respect encounter-order, even for parallel, unordered streams? Is it the specific Collectors.toCollection(...)
collector the one that forces encounter-order?
Collectors.toCollection
returns a Collector
which lacks the Collector.Characteristics.UNORDERED
characteristic. Another collector which specified Collector.Characteristics.UNORDERED
might behave differently.
That said: "unordered" means no guarantees, not guaranteed to vary. If the library finds it easiest to treat an unordered collection as ordered, it is allowed to do so, and that behavior is allowed to change release to release, on Tuesdays, or if there is a full moon.
(Note also that Collectors.toCollection
does not require you to use a concurrent collection implementation if you're going to use parallel streams; toCollection(ArrayList::new)
would work fine. That's because the collector doesn't have the Collector.Characteristics.CONCURRENT
characteristic, so it uses a collection strategy that works for non-concurrent collections even with parallel streams.)
If you use an unordered stream but a collector that isn't UNORDERED
, or vice versa, I doubt you get any guarantees from the framework. If there were a table, it would say "HERE BE DRAGONS UNDEFINED BEHAVIOR." I'd also expect some differences for different kinds of chained operations here, e.g. Eugene mentions findFirst
varies here, even though findFirst
is inherently an ordered operation -- unordered().findFirst()
becomes equivalent to findAny()
.
For Stream.collect
, I believe the current implementation has three strategies it chooses between:
- Sequential: starts one accumulator, accumulates elements into it (in encounter order, because why would you bother doing work to shuffle the elements? just accept them in the order you get them), calls the finisher.
- Parallel execution, concurrent collector, and the stream or the collector are unordered: one accumulator, shards the input, worker threads process the elements from each shard and add elements to the accumulator when they're ready, calls the finisher.
- Parallel execution, anything else: shards the input into N shards, each shard gets sequentially accumulated into its own distinct accumulator, the accumulators get combined with the combiner function, calls the finisher.
Under the current implementation I've checked java-8 and java-9 the unordered flag is ignored for the collect
phase for non-concurrent collectors (Collector.Characteristics.UNORDERED
is not set). The implementations are allowed to do that, somehow similar question.
In the same question that you linked, I did provide the example of how findFirst
actually changed from jdk-8 to jdk-9.
the documentation Stream#collect has already mentioned:
When executed in parallel, multiple intermediate results may be instantiated, populated, and merged so as to maintain isolation of mutable data structures. Therefore, even when executed in parallel with non-thread-safe data structures (such as ArrayList), no additional synchronization is needed for a parallel reduction.
which means the Stream#collect
does two primary thing: split
& merge
.
but I have a special example in jdk-8 that you can fetch the different results, :). when creates a unordered stream by Stream#generate, then you can fetch different result on Collectors#toList
, for example:
Set<Set<Integer>> result = IntStream.range(0, 10).mapToObj(__ -> {
return unordered().parallel().collect(toSet());
}).collect(toSet());
assert result.each.size() == 100000; // ok
// v--- surprised, it was pass
assert result.size() > 1;
Stream<Integer> unordered() {
AtomicInteger counter = new AtomicInteger();
return Stream.generate(counter::getAndIncrement).limit(10000);
}