Discrepancy in running Apache Beam Data Generator

2019-08-28 04:06发布

问题:

This question is related to my earlier post about benchmarking Apache Beam with an on-the-fly data generator.

I have the following code to generate data within my pipeline:

PCollection<Long> data = pipeline.apply(GenerateSequence.from(1)
                         .withMaxReadTime(Duration.millis(3000)));

//Print generated data
data.apply(ParDo.of(new DoFn<Long, String>() {
  @ProcessElement
  public void processElement(@Element Long input) {
    System.out.println(input);
  }
}));

pipeline.run();

If I run this code with DirectRunner (--runner=direct) I don't see the generated values on my console.

If I run the same code with FlinkRunner (--runner=FlinkRunner) I can see the generated data in the console output like the following

4106
4109
4083
.
.
.

Another issue is that, even though I declare the maximum read time as 3 seconds, the generator never stops!

If I simply omit the max read time from the code and run only the following:

PCollection<Long> data = pipeline.apply(GenerateSequence.from(1));

//Print generated data
data.apply(ParDo.of(new DoFn<Long, String>() {
  @ProcessElement
  public void processElement(@Element Long input) {
    System.out.println(input);
  }
}));

pipeline.run();

Both DirectRunner and FlinkRunner can output the generated values to the console without issues.

Does anyone have an idea why I could be facing this issue?