TopologyTestDriver上KTable聚合发送不正确的消息(TopologyTestDr

2019-10-16 12:48发布

我有一个KTable聚集的拓扑结构。 这是我创建的基础上不同的主题我有这种拓扑结构的通用方法。

public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
        Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
    return table
            .groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
                    Serialized.with(keySerde, valueSerde))
            .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
                agg.remove(newValue);
                agg.add(newValue);
                return agg;
            }, (key, oldValue, agg) -> {
                agg.remove(oldValue);
                return agg;
            }, Materialized.with(keySerde, aggregatedSerde));
}

使用时,卡夫卡,而不是通过`TopologyTestDriver`测试时,这工作得很好。

在这两种情况下,当我得到更新,则subtractor首先被调用,然后adder被调用。 问题是,使用时TopologyTestDriver ,两条消息被更新送出:前赴后继subtractor调用,以及一个又一个adder调用。 且不说在所述后发送的消息subrtractor和前adder是在一个不正确的阶段。

任何人能证实这是一个错误? 我两个卡夫卡2.0.1和2.1.0测试这一点。

编辑:
我创建了一个GitHub的测试用例来说明这个问题: https://github.com/mulho/topology-testcase

Answer 1:

这是预期的行为有两个输出记录(一个“减”的记录,一个“加”记录)。 这是一个有点棘手,了解它是如何工作的,所以让我尝试解释。

假设你有以下的输入表:

 key |  value
-----+---------
  A  |  <10,2>
  B  |  <10,3>
  C  |  <11,4>

KTable#groupBy()您提取该值作为新的密钥的第一部分(即, 1011 )和后求和的第二部分(即, 234中的聚合)。 因为AB记录都有10作为新的重点,你会总结2+3 ,你也将总结4新的关键11 。 结果表将是:

 key |  value
-----+---------
  10 |  5
  11 |  4

现在假设一个更新记录<B,<11,5>>改变原有的输入KTable到:

 key |  value
-----+---------
  A  |  <10,2>
  B  |  <11,5>
  C  |  <11,4>

因此,新的结果表应该总结5+411210

 key |  value
-----+---------
  10 |  2
  11 |  9

如果你有第二次比较的第一个结果表,你可能会注意到, 行得到了更新。 旧B|<10,3>记录从减去10|5导致10|2和新的B|<11,5>记录被添加到11|4导致11|9

这就是你看到的两个输出记录。 第一输出记录(执行减后),更新第一行(它减去旧值不是聚集的结果的一部分的任何更长),而第二个记录添加新的价值的聚合的结果。 在我们的例子中,减纪录是<10,<null,<10,3>>>并添加记录是<11,<<11,5>,null>> (在这些记录的格式是<key, <plus,minus>> (注意减去记录只设置minus部分,而添加记录只设置plus部分)。

最后再说一句:这是不可能把加号和减号记录在一起,因为正负记录的密钥可以是不同的(在我们的例子1110 ),从而可能会进入不同的分区。 这意味着,加号和减号操作可能由不同的机器上运行,因此它不可能只发出一个包含两个加号和减号第一部分记录。



文章来源: TopologyTestDriver sending incorrect message on KTable aggregations