How correctly reduce stream to another stream

2019-05-08 18:07发布

问题:

I've stream of strings and nulls like

Stream<String> str1 = Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null);

I want to reduce it to another stream, where any sequence of not null string joined together, ie like

Stream<String> str2 = Stream.of("ABC", "", "D", "EF","G")

First way, that i found - create collector that firstly reduce complete input stream to single object with list of all joined strings and then create new stream from it:

class Acc1 {
  final private List<String> data = new ArrayList<>();
  final private StringBuilder sb = new StringBuilder();

  private void accept(final String s) {
    if (s != null) 
      sb.append(s);
    else {
      data.add(sb.toString());
      sb.setLength(0);
    }
  }

  public static Collector<String,Acc1,Stream<String>> collector() {
    return Collector.of(Acc1::new, Acc1::accept, (a,b)-> a, acc -> acc.data.stream());
  }
}
...
Stream<String> str2 = str.collect(Acc1.collector());

But in this case before any use if str2, even as str2.findFirst(), input stream will be completely processed. It time and memory consuming operation and on infinity stream from some generator it will not work at all

Another way - create external object that will keep intermediate state and use it in flatMap():

class Acc2 {
  final private StringBuilder sb = new StringBuilder();

  Stream<String> accept(final String s) {
    if (s != null) {
      sb.append(s);
      return Stream.empty();
    } else {
      final String result = sb.toString();
      sb.setLength(0);
      return Stream.of(result);
    }
  }
}
...
Acc2 acc = new Acc2();
Stream<String> str2 = str1.flatMap(acc::accept);

In this case from str1 will be retrieved only elemets that really accessed via str2.

But using of external object, created outside of stream processing, looks ugly for me and probably can cause some side effects, that i do not see now. Also if str2 will be used later with parallelStream() it will cause unpredictable result.

Is there any more correct implemetation of stream->stream reduction without these flaws?

回答1:

Reduction or its mutable variant, collect, is always an operation that will process all items. Your operation can be implemented via a custom Spliterator, e.g.

public static Stream<String> joinGroups(Stream<String> s) {
    Spliterator<String> sp=s.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<String>(sp.estimateSize(), 
        sp.characteristics()&Spliterator.ORDERED | Spliterator.NONNULL) {
            private StringBuilder sb = new StringBuilder();
            private String last;

            public boolean tryAdvance(Consumer<? super String> action) {
                if(!sp.tryAdvance(str -> last=str))
                    return false;
                while(last!=null) {
                    sb.append(last);
                    if(!sp.tryAdvance(str -> last=str)) break;
                }
                action.accept(sb.toString());
                sb=new StringBuilder();
                return true;
            }
        }, false);
}

which produces the intended groups, as you can test with

joinGroups(Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null))
    .forEach(System.out::println);

but also has the desired lazy behavior, testable via

joinGroups(
    Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null)
          .peek(str -> System.out.println("consumed "+str))
).skip(1).filter(s->!s.isEmpty()).findFirst().ifPresent(System.out::println);

After a second thought, I came to this slightly more efficient variant. It will incorporate the StringBuilder only if there are at least two Strings to join, otherwise, it will simply use the already existing sole String instance or the literal "" string for empty groups:

public static Stream<String> joinGroups(Stream<String> s) {
    Spliterator<String> sp=s.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<String>(sp.estimateSize(), 
        sp.characteristics()&Spliterator.ORDERED | Spliterator.NONNULL) {
            private String next;

            public boolean tryAdvance(Consumer<? super String> action) {
                if(!sp.tryAdvance(str -> next=str))
                    return false;
                String string=next;
                if(string==null) string="";
                else if(sp.tryAdvance(str -> next=str) && next!=null) {
                    StringBuilder sb=new StringBuilder().append(string);
                    do sb.append(next);while(sp.tryAdvance(str -> next=str) && next!=null);
                    string=sb.toString();
                }
                action.accept(string);
                return true;
            }
        }, false);
}


回答2:

It's quite hard to implement such scenarios using standard Stream API. In my free StreamEx library I extended standard Stream interface with methods which allow to perform so-called "partial reduction" which is exactly what is necessary here:

StreamEx<String> str1 = StreamEx.of("A","B","C",null,null,"D",null,"E","F",null,"G",null);
Stream<String> str2 = str1.collapse((a, b) -> a != null,
                          MoreCollectors.filtering(Objects::nonNull, Collectors.joining()));
str2.map(x -> '"'+x+'"').forEach(System.out::println);

Output:

"ABC"
""
"D"
"EF"
"G"

The StreamEx.collapse() method performs a partial reduction of the stream using the supplied collector. The first argument is a predicate which applied to two adjacent original items and should return true if they must be reduced together. Here we just require that first of the pair is not null ((a, b) -> a != null): this means that every group ends with null and new group starts here. Now we need to join group letters together: this can be done by standard Collectors.joining() collector. However we need also to filter out null. We can do it using MoreCollectors.filtering collector (actually the same collector will be available in Java 9 in Collectors class).

This implementation is completely lazy and quite friendly to parallel processing.