Is it possible to do a lazy groupby, returning a s

2019-05-02 11:44发布

I have some large-ish text files that I want to process by grouping its lines.

I tried to use the new streaming features, like

return FileUtils.readLines(...) 
            .parallelStream()
            .map(...)
            .collect(groupingBy(pair -> pair[0]));

The problem is that, AFAIK, this generates a Map.

Is there any way to have high level code like the one above that generates, for example, a Stream of Entries?

UPDATE: What I'm looking for is something like python's itertools.groupby. My files are already sorted (by pair[0]), I just want to load the groups one by one.

I already have an iterative solution. I'm just wondering if there's a more declarative way to do that. Btw, using guava or another 3rd party library wouldn't be a big problem.

3条回答
Anthone
2楼-- · 2019-05-02 12:07

cyclops-react, I library a contribute to, offers both sharding and grouping funcitonality that might do what you want.

  ReactiveSeq<ListX<TYPE>> grouped = ReactiveSeq.fromCollection(FileUtils.readLines(...) )
             .groupedStatefullyWhile((batch,next) ->  batch.size()==0 ? true : next.equals(batch.get(0)));

The groupedStatefullyWhile operator allows elements to be grouped based on the current state of the batch. ReactiveSeq is a single threaded sequential Stream.

  Map<Key, Stream<Value> sharded = 
                  new LazyReact()
                 .fromCollection(FileUtils.readLines(...) )
                 .map(..)
                 .shard(shards, pair -> pair[0]);

This will create a LazyFutureStream (that implements java.util.stream.Stream), that will process the data in the file asynchronously and in parallel. It's lazy and won't start processing until data is pulled through.

The only caveat is that you need to define the shards beforehand. I.e. the 'shards' parameter above which is a Map of async.Queue's keyed by the key to the shard (possibly whatever pair[0] is?).

e.g.

Map<Integer,Queue<String>> shards;

There is a sharding example with video here and test code here

查看更多
Luminary・发光体
3楼-- · 2019-05-02 12:13

It can be done by collapse with StreamEx

final int[][] aa = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 4, 4 } };

StreamEx.of(aa)
        .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0]))
        .forEach(System.out::println);

We can add peek and limit to verify if it's lazy calculation:

StreamEx.of(aa)
        .peek(System.out::println)
        .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0]))
        .limit(1)
        .forEach(System.out::println);
查看更多
可以哭但决不认输i
4楼-- · 2019-05-02 12:14

The task you want to achieve is quite different from what grouping does. groupingBy does not rely on the order of the Stream’s elements but on the Map’s algorithm applied to the classifier Function’s result.

What you want is to fold adjacent items having a common property value into one List item. It is not even necessary to have the Stream sorted by that property as long as you can guaranty that all items having the same property value are clustered.

Maybe it is possible to formulate this task as a reduction but to me the resulting structure looks too complicated.

So, unless direct support for this feature gets added to the Streams, an iterator based approach looks most pragmatic to me:

class Folding<T,G> implements Spliterator<Map.Entry<G,List<T>>> {
    static <T,G> Stream<Map.Entry<G,List<T>>> foldBy(
            Stream<? extends T> s, Function<? super T, ? extends G> f) {
        return StreamSupport.stream(new Folding<>(s.spliterator(), f), false);
    }
    private final Spliterator<? extends T> source;
    private final Function<? super T, ? extends G> pf;
    private final Consumer<T> c=this::addItem;
    private List<T> pending, result;
    private G pendingGroup, resultGroup;

    Folding(Spliterator<? extends T> s, Function<? super T, ? extends G> f) {
        source=s;
        pf=f;
    }
    private void addItem(T item) {
        G group=pf.apply(item);
        if(pending==null) pending=new ArrayList<>();
        else if(!pending.isEmpty()) {
            if(!Objects.equals(group, pendingGroup)) {
                if(pending.size()==1)
                    result=Collections.singletonList(pending.remove(0));
                else {
                    result=pending;
                    pending=new ArrayList<>();
                }
                resultGroup=pendingGroup;
            }
        }
        pendingGroup=group;
        pending.add(item);
    }
    public boolean tryAdvance(Consumer<? super Map.Entry<G, List<T>>> action) {
        while(source.tryAdvance(c)) {
            if(result!=null) {
                action.accept(entry(resultGroup, result));
                result=null;
                return true;
            }
        }
        if(pending!=null) {
            action.accept(entry(pendingGroup, pending));
            pending=null;
            return true;
        }
        return false;
    }
    private Map.Entry<G,List<T>> entry(G g, List<T> l) {
        return new AbstractMap.SimpleImmutableEntry<>(g, l);
    }
    public int characteristics() { return 0; }
    public long estimateSize() { return Long.MAX_VALUE; }
    public Spliterator<Map.Entry<G, List<T>>> trySplit() { return null; }
}

The lazy nature of the resulting folded Stream can be best demonstrated by applying it to an infinite stream:

Folding.foldBy(Stream.iterate(0, i->i+1), i->i>>4)
       .filter(e -> e.getKey()>5)
       .findFirst().ifPresent(e -> System.out.println(e.getValue()));
查看更多
登录 后发表回答