We figured out how to create a custom combine function (after lots of guesswork and beam sdk 2.0 code reading) in beam sdk 2.0, as the dataflow sdk 1.x syntax did not work in sdk 2.0.
However, we can't figure out how to create a custom combine PER KEY function in beam sdk 2.0. Any help or pointers (or better yet an actual example) would be greatly appreciated. (We scoured the internet for documentation or examples and found none; we also attempted to look at the code within beam sdk 2.0's Combine class, but couldn't figure it out, especially since the PerKey class now has a private constructor, so we can't extend it any longer.)
In case it helps, here's how we correctly created a custom combiner (without) keys in beam sdk 2.0, but we can't figure out how to create one with a key:
public class CombineTemplateIntervalsIntoBlocks
extends Combine.AccumulatingCombineFn<ImmutableMySetOfIntervals, TemplateIntervalAccum, ArrayList<ImmutableMySetOfIntervals>>{
public CombineTemplateIntervalsIntoBlocks() {
}
@Override
public TemplateIntervalAccum createAccumulator() {
return new TemplateIntervalAccum()
}
and then
public class TemplateIntervalAccum
implements Combine.AccumulatingCombineFn.Accumulator<ImmutableMySetOfIntervals, TemplateIntervalAccum, ArrayList<ImmutableMySetOfIntervals>>, Serializable {
...
You don't need to create your CombineFn differently to use a Combine.PerKey.
You can extend either
AccumulatingCombineFn
(which puts the merging logic in the accumulator) or extendCombineFn
(which puts the merging logic in theCombineFn
). There are also other options such asBinaryCombineFn
andIterableCombineFn
.Say that you have a
CombineFn<InputT, AccumT, OutputT>
calledcombineFn
:Combine.globally(combineFn)
to create aPTransform
that takes aPCollection<InputT>
and combines all the elements.Combine.perKey(combineFn)
to create aPTransform
that takes aPCollection<KV<K, InputT>>
and combines all the values associated with a each key and combines them. This corresponds to theCombine.PerKey
I believe you are referring to.