What I\'d like to do is this:
- Consume records from a numbers topic (Long\'s)
- Aggregate (count) the values for each 5 sec window
- Send the FINAL aggregation result to another topic
My code looks like this:
KStream<String, Long> longs = builder.stream(
Serdes.String(), Serdes.Long(), \"longs\");
// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts =
longs.countByKey(TimeWindows.of(\"longCounts\", 5000L));
// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
.to(\"long-counts\");
It looks like everything works as expected, but the aggregations are sent to the destination topic for each incoming record. My question is how can I send only the final aggregation result of each window?
In Kafka Streams there is no such thing as a \"final aggregation\". Windows are kept open all the time to handle late arriving records (of course windows are not kept forever, they get discarded until their retention time expires -- however, there is no special action when a window gets discarded).
See Confluent documentation for more details: http://docs.confluent.io/current/streams/
Thus, for each update to an aggregation, a result record is produced (because Kafka Streams also update the aggregation result on late arriving records). Your \"final result\" would be the latest result record (before a window gets discarded). Depending on your use case, manual de-duplication would be a way to resolve the issue (using lower lever API, transform()
or process()
)
This blog post might help, too: https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html
Another blog post addressing this issue without using punctuations: http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html
Update
With KIP-328, a KTable#suppress()
operator is added, that will allow to suppress consecutive updates in a strict manner and to emit a single result record per window; the tradeoff is an increase latency.