I'm trying to aggregate a PCollection<String>
into PCollection<List<String>>
with ~60 elements each.
They will be sent to an API which accepts 60 elements per request. Currently I'm trying it by windowing, but there is only elementCountAtLeast, so I have to collect them into a list and count again and split in case it is too long. This is quite cumbersome and results in a lot of lists with just few elements:
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(maxNrOfelementsPerList),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1)))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply("CollectIntoLists", Combine.globally(new StringToListCombinator()).withoutDefaults())
.apply("SplitListsToMaxSize", ParDo.of(new DoFn<List<String>, List<String>>() {
@ProcessElement
public void apply(ProcessContext pc) {
splitList(pc.element(), maxNrOfelementsPerList).forEach(pc::output);
}
}));
Is there any direct and more consistent way to do this aggregation?
This can be built using the State API in Dataflow 2.x.
Basically, you would write a Stateful DoFn that had two pieces a state -- a count of the number of elements and a "bag" of the elements that have been buffered.
When an element arrives, you add it to the bag and increment the count. You then check the count, and if it is 60 you output it, and clear both pieces of state.
Since each key of a Stateful DoFn will run on a single machine, it would probably be good to randomly distribute your elements across N keys, so that you can scale up to N machines (multiple keys may run on one machine).