我的目标是基准延迟和流数据的使用情况用不同的窗口查询吞吐量阿帕奇梁。
我想用的即时数据生成器创建自己的数据来手动控制数据生成速率,并直接从管道消耗这个数据没有发布/订阅机制,即我不想读从数据经纪人等,以避免出现瓶颈。 有没有做类似的什么,我想实现的东西的一种方式? 或者是有这样的用例与梁的SDK任何源代码? 到目前为止,我无法找到一个起点,现有的代码示例使用发布/订阅机制,他们认为数据来自什么地方。
感谢您的建议提前。
我的目标是基准延迟和流数据的使用情况用不同的窗口查询吞吐量阿帕奇梁。
我想用的即时数据生成器创建自己的数据来手动控制数据生成速率,并直接从管道消耗这个数据没有发布/订阅机制,即我不想读从数据经纪人等,以避免出现瓶颈。 有没有做类似的什么,我想实现的东西的一种方式? 或者是有这样的用例与梁的SDK任何源代码? 到目前为止,我无法找到一个起点,现有的代码示例使用发布/订阅机制,他们认为数据来自什么地方。
感谢您的建议提前。
至于在即时数据,一个办法是利用GenerateSequence的,例如:
pipeline.apply(GenerateSequence.from(0).withRate(RATE,Duration.millis(1000)))
要创建其他类型的对象,你可以使用一个帕尔多后消耗的龙,并使其成别的东西:
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
p.apply(GenerateSequence.from(0).withRate(2, Duration.millis(1000)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(FlatMapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(i -> IntStream.range(0,2).mapToObj(k -> KV.of(String.format("Gen Value %s" , i),String.format("FlatMap Value %s ", k))).collect(Collectors.toList())))
.apply(ParDo.of(new DoFn<KV<String,String>, String>() {
@ProcessElement
public void process(@Element KV<String,String> input){
LOG.info("Value was {}", input);
}
}));
p.run();
这应该产生这样的值:
Value was KV{Gen Value 0, FlatMap Value 0 }
Value was KV{Gen Value 0, FlatMap Value 1 }
Value was KV{Gen Value 1, FlatMap Value 0 }
Value was KV{Gen Value 1, FlatMap Value 1 }
Value was KV{Gen Value 2, FlatMap Value 0 }
Value was KV{Gen Value 2, FlatMap Value 1 }
一些其他的事情要记住你的管道性能测试:
直接亚军是专为单元测试,但它确实像模拟故障很酷的事情,这有助于将运行的生产流水线时可以看到渔获物问题。 它旨在帮助性能测试不过。 我总是使用与主浇道这些类型的集成测试建议。
请注意融合优化的链接文件 ,使用人工数据源一样GenerateSequence时,你可能需要做一个GBK作为下一步,使工作得以并行。 对于数据流亚军的详细信息可以在这里找到: 链接到Google文档
通常,对于性能测试,我会建议测试整个端到端管道。 存在与源和汇(例如水印),这将不在一个独立的管道被测试的相互作用。
希望帮助。