我有一个KTable聚集的拓扑结构。 这是我创建的基础上不同的主题我有这种拓扑结构的通用方法。
public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
return table
.groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
Serialized.with(keySerde, valueSerde))
.aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
agg.remove(newValue);
agg.add(newValue);
return agg;
}, (key, oldValue, agg) -> {
agg.remove(oldValue);
return agg;
}, Materialized.with(keySerde, aggregatedSerde));
}
使用时,卡夫卡,而不是通过`TopologyTestDriver`测试时,这工作得很好。
在这两种情况下,当我得到更新,则subtractor
首先被调用,然后adder
被调用。 问题是,使用时TopologyTestDriver
,两条消息被更新送出:前赴后继subtractor
调用,以及一个又一个adder
调用。 且不说在所述后发送的消息subrtractor
和前adder
是在一个不正确的阶段。
任何人能证实这是一个错误? 我两个卡夫卡2.0.1和2.1.0测试这一点。
编辑:
我创建了一个GitHub的测试用例来说明这个问题: https://github.com/mulho/topology-testcase