Why is my fusion breaker losing or holding back da

2019-09-16 05:32发布

I am working on a streaming Dataflow pipeline that consumes messages of batched items from PubSub and eventually writes them to Datastore. For better parallelism, and also for timely acknowledgement of the messages pulled from the PubSub, I unpack the batches into individual items and add a fusion breaker right after it.

So the pipeline looks like this ...

PubSubIO -> deserialize -> unpack -> fusion break -> validation/transform -> DatastoreIO.

Here is my fusion breaker, largely copied from the JdbcIO class. It uses a trigger to break down the data that are in the global window.

public class BreakFusionTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {

  @Override
  public PCollection<T> expand(PCollection<T> input) {
    return input
        .apply(ParDo.of(new RandomKeyFn<T>()))
        .apply(Window.<KV<Integer, T>>triggering(
            Repeatedly.forever(
                AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(2L))))
            .discardingFiredPanes())
        .apply(GroupByKey.<Integer, T>create())
        .apply(Values.<Iterable<T>>create())
        .apply(Flatten.<T>iterables());
  }

  private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
    private Random random;

    @Setup
    public void setup() {
      random = new Random();
    }

    @ProcessElement
    public void processElement(ProcessContext context) {
      context.output(KV.of(random.nextInt(), context.element()));
    }
  }
}

It works most of the time, except on several occasions when it generates less number of outputs than the number of inputs, even after the streaming input is done and the pipeline goes idle for ten minutes.

As seen in the Dataflow Job monitoring console below. The screen shot was taken after the job was drained, after I waited for around 10 minutes for the data to come out of the transform.

enter image description here

*Can someone think of an explanation for that? It feels as if the fusion breaker is holding back or has lost some items. *

I noticed that it only happens when the data volume / data rate is high, forcing the pipeline to scale up in the middle of test run, doubling from 25 to 50 n1-highmem-2 workers. However, I haven't done enough tests to verify if the scaling up is the key to reproduce this problem.

Or maybe the trigger fires off too frequently at once every two seconds?

I am using Dataflow 2.0.0-beta1. The Job Id is "2017-02-23_23_15_34-14025424484787508627".

1条回答
贼婆χ
2楼-- · 2019-09-16 05:50

Counters in Streaming Dataflow are best-effort measures; autoscaling in particular may cause larger discrepancies. The pipeline should not lose data in this case.

查看更多
登录 后发表回答