I'm trying to aggregate a PCollection<String>
into PCollection<List<String>>
with ~60 elements each.
They will be sent to an API which accepts 60 elements per request. Currently I'm trying it by windowing, but there is only elementCountAtLeast, so I have to collect them into a list and count again and split in case it is too long. This is quite cumbersome and results in a lot of lists with just few elements:
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(maxNrOfelementsPerList),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1)))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply("CollectIntoLists", Combine.globally(new StringToListCombinator()).withoutDefaults())
.apply("SplitListsToMaxSize", ParDo.of(new DoFn<List<String>, List<String>>() {
@ProcessElement
public void apply(ProcessContext pc) {
splitList(pc.element(), maxNrOfelementsPerList).forEach(pc::output);
}
}));
Is there any direct and more consistent way to do this aggregation?