I am working on a streaming Dataflow pipeline that consumes messages of batched items from PubSub and eventually writes them to Datastore. For better parallelism, and also for timely acknowledgement of the messages pulled from the PubSub, I unpack the batches into individual items and add a fusion breaker right after it.
So the pipeline looks like this ...
PubSubIO -> deserialize -> unpack -> fusion break -> validation/transform -> DatastoreIO.
Here is my fusion breaker, largely copied from the JdbcIO class. It uses a trigger to break down the data that are in the global window.
public class BreakFusionTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> expand(PCollection<T> input) {
return input
.apply(ParDo.of(new RandomKeyFn<T>()))
.apply(Window.<KV<Integer, T>>triggering(
Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(2L))))
.discardingFiredPanes())
.apply(GroupByKey.<Integer, T>create())
.apply(Values.<Iterable<T>>create())
.apply(Flatten.<T>iterables());
}
private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
private Random random;
@Setup
public void setup() {
random = new Random();
}
@ProcessElement
public void processElement(ProcessContext context) {
context.output(KV.of(random.nextInt(), context.element()));
}
}
}
It works most of the time, except on several occasions when it generates less number of outputs than the number of inputs, even after the streaming input is done and the pipeline goes idle for ten minutes.
As seen in the Dataflow Job monitoring console below. The screen shot was taken after the job was drained, after I waited for around 10 minutes for the data to come out of the transform.
*Can someone think of an explanation for that? It feels as if the fusion breaker is holding back or has lost some items. *
I noticed that it only happens when the data volume / data rate is high, forcing the pipeline to scale up in the middle of test run, doubling from 25 to 50 n1-highmem-2 workers. However, I haven't done enough tests to verify if the scaling up is the key to reproduce this problem.
Or maybe the trigger fires off too frequently at once every two seconds?
I am using Dataflow 2.0.0-beta1. The Job Id is "2017-02-23_23_15_34-14025424484787508627".