stream collect accumulator/combiner order

2020-05-27 11:39发布

问题:

This is basically a follow-up of this answer of mine.

Suppose that I am working on custom collector and supposing that the accumulator always will add some element to the collection returned by the supplier, is there any chance that when combiner is called, one of the intermediate results will be empty? An example is probably a lot simpler to understand.

Suppose I have a List of numbers and I want to split it in List of Lists, where 2 is the separator. So for example I have 1, 2, 3, 4, 2, 8, the result should be [[1], [3, 4], [8]]. This is not really complicated to achieve (don't judge the code too much, I wrote something fast, just so I could write this question).

List<List<Integer>> result = Stream.of(1, 2, 3, 4, 2, 8)
            .collect(Collector.of(
                    () -> new ArrayList<>(),
                    (list, elem) -> {
                        if (list.isEmpty()) {
                            List<Integer> inner = new ArrayList<>();
                            inner.add(elem);
                            list.add(inner);
                        } else {
                            if (elem == 2) {
                                list.add(new ArrayList<>());
                            } else {
                                List<Integer> last = list.get(list.size() - 1);
                                last.add(elem);
                            }
                        }
                    },
                    (left, right) -> {
                        // This is the real question here:
                        // can left or right be empty here?
                        return left;
                    }));

This is irrelevant probably in this example, but the question is: can the one the elements in the combiner be an empty List? I am really really inclined to say NO, since in the documentation these are referred as:

combiner - an associative, non-interfering, stateless function that accepts two partial result containers and merges them.

Well that partial to me is an indication that accumulator was called on them, before they reached combiner, but just wanted to be sure.

回答1:

There is no guaranty that the accumulator has been applied to a container before merging. In other words, the lists to merge may be empty.

To demonstrate this:

IntStream.range(0, 10).parallel().boxed()
         .filter(i -> i >= 3 && i < 7)
         .collect(ArrayList::new, List::add, (l1,l2)->{
             System.out.println(l1.size()+" + "+l2.size());
             l1.addAll(l2);
         });

On my machine, it prints:

0 + 0
0 + 0
0 + 0
1 + 1
0 + 2
0 + 2
1 + 1
2 + 0
2 + 2

The workload splitting happens at the source list, when the outcome of the filter operation is not known yet. Each chunk is processed the same way, without rechecking whether any element has arrived the accumulator.

Mind that starting with Java 9, you can also do something like

IntStream.range(0, 10).parallel().boxed()
        .collect(Collectors.filtering(i -> i >= 3 && i < 7, Collectors.toList()));

which is another reason why a collector (here, the toList() collector) should be prepared to encounter empty containers, as the filtering happens outside the Stream implementation and an accept call on the compound collector’s accumulator doesn’t always imply an accept call on the downstream collector’s accumulator.

The requirement of being able to handle empty containers is specified in the Collector documentation:

To ensure that sequential and parallel executions produce equivalent results, the collector functions must satisfy an identity and an associativity constraints.

The identity constraint says that for any partially accumulated result, combining it with an empty result container must produce an equivalent result. That is, for a partially accumulated result a that is the result of any series of accumulator and combiner invocations, a must be equivalent to combiner.apply(a, supplier.get()).