I have a topology that aggregates on a KTable.
This is a generic method I created to build this topology on different topics I have.
public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
return table
.groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
Serialized.with(keySerde, valueSerde))
.aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
agg.remove(newValue);
agg.add(newValue);
return agg;
}, (key, oldValue, agg) -> {
agg.remove(oldValue);
return agg;
}, Materialized.with(keySerde, aggregatedSerde));
}
This works pretty well when using Kafka, but not when testing via `TopologyTestDriver`.
In both scenarios, when I get an update, the subtractor
is called first, and then the adder
is called. The problem is that when using the TopologyTestDriver
, two messages are sent out for updates: one after the subtractor
call, and another one after the adder
call. Not to mention that the message that is sent after the subrtractor
and before the adder
is in an incorrect stage.
Any one else could confirm this is a bug? I've tested this for both Kafka versions 2.0.1 and 2.1.0.
EDIT:
I created a testcase in github to illustrate the issue: https://github.com/mulho/topology-testcase
It is expected behavior that there are two output records (one "minus" record, and one "plus" record). It's a little tricky to understand how it works, so let me try to explain.
Assume you have the following input table:
key | value
-----+---------
A | <10,2>
B | <10,3>
C | <11,4>
On KTable#groupBy()
you extract the first part of the value as new key (ie, 10
or 11
) and later sum the second part (ie, 2
, 3
, 4
) in the aggregation. Because A
and B
record both have 10
as new key, you would sum 2+3
and you would also sum 4
for new key 11
. The result table would be:
key | value
-----+---------
10 | 5
11 | 4
Now assume that an update record <B,<11,5>>
change the original input KTable to:
key | value
-----+---------
A | <10,2>
B | <11,5>
C | <11,4>
Thus, the new result table should sum up 5+4
for 11
and 2
for 10
:
key | value
-----+---------
10 | 2
11 | 9
If you compare the first result table with the second, you might notice that both rows got update. The old B|<10,3>
record is subtracted from 10|5
resulting in 10|2
and the new B|<11,5>
record is added to 11|4
resulting in 11|9
.
This is exactly the two output records you see. The first output record (after subtract is executed), updates the first row (it subtracts the old value that is not part of the aggregation result any longer), while the second record adds the new value to the aggregation result. In our example, the subtract record would be <10,<null,<10,3>>>
and the add record would be <11,<<11,5>,null>>
(the format of those record is <key, <plus,minus>>
(note that the subtract record only set the minus
part while the add record only set the plus
part).
Final remark: it is not possible to put plus and minus records together, because the key of the plus and minus record can be different (in our example 11
and 10
), and thus might go into different partitions. This implies that the plus and minus operation might be executed by different machines and thus it's not possible to only emit one record that contains both plus and minus part.