Performing more than one reduction in a single pas

2019-04-11 16:06发布

问题:

What is the idiom for performing more than one reduction in a single pass of a stream? Is it just to have one big reducer class, even if this violates SRP if more than one type of reduction computation is required?

回答1:

Presumably you want to avoid making multiple passes, as the pipeline stages might be expensive. Or you want to avoid collecting the intermediate values in order to run them through multiple collectors, since the cost of storing all the values might be too high.

As Brian Goetz noted, Collectors.summarizingInt will collect int values and perform multiple reductions on them, returning an aggregate structure called IntSummaryStatistics. There are similar collectors for summarizing double and long values.

Unfortunately these perform only a fixed set of reductions, so if you want to do reductions different from what they do, you have to write your own collector.

Here's a technique for using multiple, unrelated collectors in a single pass. We can use peek() to take a crack at every value going through the stream, passing it through undisturbed. The peek() operation takes a Consumer, so we need a way to adapt a Collector to a Consumer. The Consumer will be the Collector's accumulator function. But we also need to call the Collector's supplier function and store the object it creates for passing to the accumulator function. And we need a way to get the result out of the Collector. To do this, we'll wrap the Collector in a little helper class:

public class PeekingCollector<T,A,R> {
    final Collector<T,A,R> collector;
    final A acc;

    public PeekingCollector(Collector<T,A,R> collector) {
        this.collector = collector;
        this.acc = collector.supplier().get();
    }

    public Consumer<T> peek() {
        if (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
            return t -> collector.accumulator().accept(acc, t);
        else
            return t -> {
                synchronized (this) {
                    collector.accumulator().accept(acc, t);
                }
            };
    }

    public synchronized R get() {
        return collector.finisher().apply(acc);
    }
}

To use this, we first have to create the wrapped collector and hang onto it. Then we run the pipeline and call peek, passing the wrapped collector. Finally we call get on the wrapped collector to get its result. Here's a simple example that filters and sorts some words, while also grouping them by first letter:

    List<String> input = Arrays.asList(
        "aardvark", "crocodile", "antelope",
        "buffalo", "bustard", "cockatoo",
        "capybara", "bison", "alligator");

    PeekingCollector<String,?,Map<String,List<String>>> grouper =
        new PeekingCollector<>(groupingBy(s -> s.substring(0, 1)));

    List<String> output = input.stream()
                               .filter(s -> s.length() > 5)
                               .peek(grouper.peek())
                               .sorted()
                               .collect(toList());

    Map<String,List<String>> groups = grouper.get();
    System.out.println(output);
    System.out.println(groups);

Output is:

[aardvark, alligator, antelope, buffalo, bustard, capybara, cockatoo, crocodile]
{a=[aardvark, antelope, alligator], b=[buffalo, bustard], c=[crocodile, cockatoo, capybara]}

It's a bit cumbersome, as you have to write out the generic types for the wrapped collector (which is a bit unusual; they're often all inferred). But if the expense of processing or storing stream values is great enough, perhaps it's worth the trouble.

Finally note that peek() can be called from multiple threads if the stream is run in parallel. For this reason non-thread-safe collectors must be protected by a synchronized block. If the collector is thread-safe, we needn't synchronize around calling it. To determine this, we check the collector's CONCURRENT characteristic. If you run a parallel stream, it's preferable to place a concurrent collector (such as groupingByConcurrent or toConcurrentMap) within the peek operation, otherwise the synchronization within the wrapped collector may cause a bottleneck and slow down the entire stream.