My goal is to benchmark the latency and the throughput of Apache Beam on a streaming data use-case with different window queries.
I want to create my own data with an on-the-fly data generator to control the data generation rate manually and consume this data directly from a pipeline without a pub/sub mechanism, i.e. I don't want to read the data from a broker, etc. to avoid bottlenecks.
Is there a way of doing something similar to what I want to achieve? or is there any source code for such use-case with Beam SDKs?
So far I couldn't find a starting point, existing code samples use pub/sub mechanism and they assume data comes from somewhere.
Thank you for suggestions in advance.
With regards to On-the-fly data, one option would be to make use of GenerateSequence for example:
pipeline.apply(GenerateSequence.from(0).withRate(RATE,Duration.millis(1000)))
To create other types of objects you can use a ParDo after to consume the Long and make it into something else:
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
p.apply(GenerateSequence.from(0).withRate(2, Duration.millis(1000)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(FlatMapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(i -> IntStream.range(0,2).mapToObj(k -> KV.of(String.format("Gen Value %s" , i),String.format("FlatMap Value %s ", k))).collect(Collectors.toList())))
.apply(ParDo.of(new DoFn<KV<String,String>, String>() {
@ProcessElement
public void process(@Element KV<String,String> input){
LOG.info("Value was {}", input);
}
}));
p.run();
That should generate values like:
Value was KV{Gen Value 0, FlatMap Value 0 }
Value was KV{Gen Value 0, FlatMap Value 1 }
Value was KV{Gen Value 1, FlatMap Value 0 }
Value was KV{Gen Value 1, FlatMap Value 1 }
Value was KV{Gen Value 2, FlatMap Value 0 }
Value was KV{Gen Value 2, FlatMap Value 1 }
Some other things to keep in mind for your pipelines performance testing:
The Direct runner is designed for unit testing, it does cool things like simulate failures, this helps catch issues which will be seen when running a production pipeline. It is not designed to help with performance testing however. I would recommend always using a main runner for those types of integration tests.
Please be aware of the Fusion optimization Link to Docs, when using a artificial data source like GenerateSequence you may need to do a GBK as the next step to allow the work to be parallelized. For the Dataflow runner more info can be found here: Link to Docs
In general for performance testing, I would recommend testing the whole end to end pipeline. There are interactions with sources and sinks ( for example watermarks ) which will not be tested in a standalone pipeline.
Hope that helps.