Java8 : how to aggregate objects from a stream?

2019-09-19 09:17发布

问题:

Edit

IMHO : I think it is not a duplicate because the two questions are trying to solve the problem in different ways and especially because they provide totally different technological skills (and finally, because I ask myself these two questions).

Question

How to aggregate items from an ordered stream, preferably in an intermediate operation ?

Context

Following my other question : Java8 stream lines and aggregate with action on terminal line

I've got a very large file of the form :

MASTER_REF1
    SUBREF1
    SUBREF2
    SUBREF3
MASTER_REF2
MASTER_REF3
    SUBREF1
    ...

Where SUBREF (if any) is applicable to MASTER_REF and both are complex objects (you can imagine it somewhat like JSON).

On first look I tried to group the lines with an operation returning null while agregating and a value when a group of line could be found (a "group" of lines ends if line.charAt(0)!=' ').

This code is hard to read and requires a .filter(Objects::nonNull).

I think one could achieve this using a .collect(groupingBy(...)) or a .reduce(...) but those are terminal operations which is :

  • not required in my case : lines are ordered and should be grouped by their position and groups of line are to be transformed afterwards (map+filter+...+foreach);
  • nor a good idea : I'm talking of a huge data file that is way bigger than the total amount of RAM+SWAP ... a terminal operation would saturate availiable resources (as said, by design I need to keep groups in memory because are to be transformed afterwards)

回答1:

As I already noted in the answer to the previous question, it's possible to use some third-party libraries which provide partial reduction operations. One of such libraries is StreamEx which I develop by myself.

In StreamEx library the partial reduction operation is the intermediate stream operation which combines several input elements while some condition is met. Usually the condition is specified via BiPredicate applied to the pair of adjacent stream elements which returns true when elements should be combined together. The simplest way to combine elements is to make a List via StreamEx.groupRuns() method like this:

Stream<List<String>> records = StreamEx.of(Files.lines(path))
    .groupRuns((line1, line2) -> !line2.startsWith("MASTER"));

Here we start a new record when the second of two adjacent lines starts with "MASTER" (as in your example). Otherwise we continue the previous record.

Note that such stream is still lazy. In sequential processing at most one intermediate List<String> is created at a time. Parallel processing is also supported, though turning the Files.lines stream into parallel mode rarely improves the performance (at least prior to Java-9).