How collectors are used when turning the stream in

2019-03-26 07:33发布

问题:

I actually tried to answer this question How to skip even lines of a Stream<String> obtained from the Files.lines. So I though this collector wouldn't work well in parallel:

private static Collector<String, ?, List<String>> oddLines() {
    int[] counter = {1};
    return Collector.of(ArrayList::new,
            (l, line) -> {
                if (counter[0] % 2 == 1) l.add(line);
                counter[0]++;
            },
            (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            });
}

but it works.

EDIT: It didn't actually work; I got fooled by the fact that my input set was too small to trigger any parallelism; see discussion in comments.

I thought it wouldn't work because of the two following plans of executions comes to my mind.


1. The counter array is shared among all threads.

Thread t1 read the first element of the Stream, so the if condition is satisfied. It adds the first element to its list. Then the execution stops before he has the time to update the array value.

Thread t2, which says started at the 4th element of the stream add it to its list. So we end up with a non-wanted element.

Of course since this collector seems to works, I guess it doesn't work like that. And the updates are not atomic anyway.


2. Each Thread has its own copy of the array

In this case there is no more problems for the update, but nothing prevents me that the thread t2 will not start at the 4th element of the stream. So he doesn't work like that either.


So it seems that it doesn't work like that at all, which brings me to the question... how the collector is used in parallel?

Can someone explain me basically how it works and why my collector works when ran in parallel?

Thank you very much!

回答1:

Passing a parallel() source stream into your collector is enough to break the logic because your shared state (counter) may be incremented from different tasks. You can verify that, because it is never returning the correct result for any finite stream input:

    Stream<String> lines = IntStream.range(1, 20000).mapToObj(i -> i + "");
    System.out.println(lines.isParallel());
    lines = lines.parallel();
    System.out.println(lines.isParallel());

    List<String> collected = lines.collect(oddLines());

    System.out.println(collected.size());

Note that for infinite streams (e.g. when reading from Files.lines()) you need to generate some significant amount of data in the stream, so it actually forks a task to run some chunks concurrently.

Output for me is:

false
true
12386

Which is clearly wrong.


As @Holger in the comments correctly pointed out, there is a different race that can happen when your collector is specifying CONCURRENT and UNORDERED, in which case they operate on a single shared collection across tasks (ArrayList::new called once per stream), where-as with only parallel() it will run the accumulator on a collection per task and then later combine the result using your defined combiner.

If you'd add the characteristics to the collector, you might run into the following result due to the shared state in a single collection:

false
true
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 73
    at java.util.ArrayList.add(ArrayList.java:459)
    at de.jungblut.stuff.StreamPallel.lambda$0(StreamPallel.java:18)
    at de.jungblut.stuff.StreamPallel$$Lambda$3/1044036744.accept(Unknown Source)
    at java.util.stream.ReferencePipeline.lambda$collect$207(ReferencePipeline.java:496)
    at java.util.stream.ReferencePipeline$$Lambda$6/2003749087.accept(Unknown Source)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496)
    at de.jungblut.stuff.StreamPallel.main(StreamPallel.java:32)12386


回答2:

Actually it's just a coincidence that this collector work. It doesn't work with custom data source. Consider this example:

List<String> list = IntStream.range(0, 10).parallel().mapToObj(String::valueOf)
        .collect(oddLines());
System.out.println(list);

This produces always different result. The real cause is just because when BufferedReader.lines() stream is split by at least java.util.Spliterators.IteratorSpliterator.BATCH_UNIT number of lines which is 1024. If you have substantially bigger number of lines, it may fail even with BufferedReader:

String data = IntStream.range(0, 10000).mapToObj(String::valueOf)
    .collect(Collectors.joining("\n"));
List<String> list = new BufferedReader(new StringReader(data)).lines().parallel()
    .collect(oddLines());
list.stream().mapToInt(Integer::parseInt).filter(x -> x%2 != 0)
    .forEach(System.out::println);

Were collector working normally this should not print anything. But sometimes it prints.