I have a topology that aggregates on a KTable. This is a generic method I created to build this topology on different topics I have.
public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
return table
.groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
Serialized.with(keySerde, valueSerde))
.aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
agg.remove(newValue);
agg.add(newValue);
return agg;
}, (key, oldValue, agg) -> {
agg.remove(oldValue);
return agg;
}, Materialized.with(keySerde, aggregatedSerde));
}
This works pretty well when using Kafka, but not when testing via `TopologyTestDriver`.
In both scenarios, when I get an update, the subtractor
is called first, and then the adder
is called. The problem is that when using the TopologyTestDriver
, two messages are sent out for updates: one after the subtractor
call, and another one after the adder
call. Not to mention that the message that is sent after the subrtractor
and before the adder
is in an incorrect stage.
Any one else could confirm this is a bug? I've tested this for both Kafka versions 2.0.1 and 2.1.0.
EDIT:
I created a testcase in github to illustrate the issue: https://github.com/mulho/topology-testcase
It is expected behavior that there are two output records (one "minus" record, and one "plus" record). It's a little tricky to understand how it works, so let me try to explain.
Assume you have the following input table:
On
KTable#groupBy()
you extract the first part of the value as new key (ie,10
or11
) and later sum the second part (ie,2
,3
,4
) in the aggregation. BecauseA
andB
record both have10
as new key, you would sum2+3
and you would also sum4
for new key11
. The result table would be:Now assume that an update record
<B,<11,5>>
change the original input KTable to:Thus, the new result table should sum up
5+4
for11
and2
for10
:If you compare the first result table with the second, you might notice that both rows got update. The old
B|<10,3>
record is subtracted from10|5
resulting in10|2
and the newB|<11,5>
record is added to11|4
resulting in11|9
.This is exactly the two output records you see. The first output record (after subtract is executed), updates the first row (it subtracts the old value that is not part of the aggregation result any longer), while the second record adds the new value to the aggregation result. In our example, the subtract record would be
<10,<null,<10,3>>>
and the add record would be<11,<<11,5>,null>>
(the format of those record is<key, <plus,minus>>
(note that the subtract record only set theminus
part while the add record only set theplus
part).Final remark: it is not possible to put plus and minus records together, because the key of the plus and minus record can be different (in our example
11
and10
), and thus might go into different partitions. This implies that the plus and minus operation might be executed by different machines and thus it's not possible to only emit one record that contains both plus and minus part.