This question is related to my earlier post about benchmarking Apache Beam with an on-the-fly data generator.
I have the following code to generate data within my pipeline:
PCollection<Long> data = pipeline.apply(GenerateSequence.from(1)
.withMaxReadTime(Duration.millis(3000)));
//Print generated data
data.apply(ParDo.of(new DoFn<Long, String>() {
@ProcessElement
public void processElement(@Element Long input) {
System.out.println(input);
}
}));
pipeline.run();
If I run this code with DirectRunner (--runner=direct) I don't see the generated values on my console.
If I run the same code with FlinkRunner (--runner=FlinkRunner) I can see the generated data in the console output like the following
4106
4109
4083
.
.
.
Another issue is that, even though I declare the maximum read time as 3 seconds, the generator never stops!
If I simply omit the max read time from the code and run only the following:
PCollection<Long> data = pipeline.apply(GenerateSequence.from(1));
//Print generated data
data.apply(ParDo.of(new DoFn<Long, String>() {
@ProcessElement
public void processElement(@Element Long input) {
System.out.println(input);
}
}));
pipeline.run();
Both DirectRunner and FlinkRunner can output the generated values to the console without issues.
Does anyone have an idea why I could be facing this issue?