Given the following code:
KStream<String, Custom> stream =
builder.stream(Serdes.String(), customSerde, "test_in");
stream
.groupByKey(Serdes.String(), customSerde)
.reduce(new CustomReducer(), "reduction_state")
.print(Serdes.String(), customSerde);
I have a println
statement inside the apply method of the Reducer, which successfully prints out when I expect the reduction to take place. However, the final print statement shown above displays nothing. likewise if I use a to
method rather than print
, I see no messages in the destination topic.
What do I need after the reduce statement to see the result of the reduction? If one value is pushed to the input I don't expect to see anything. If a second value with the same key is pushed I expect the reducer to apply (which it does) and I also expect the result of the reduction to continue to the next step in the processing pipeline. As described I'm not seeing anything in subsequent steps of the pipeline and I don't understand why.
As of Kafka 0.10.1.0
all aggregation operators use an internal de-duplication cache to reduce the load of the result KTable changelog stream. For example, if you count and process two records with same key directly after each other, the full changelog stream would be <key:1>, <key:2>
.
With the new caching feature, the cache would receive <key:1>
and store it, but not send it downstream right away. When <key:2>
is computed, it replace the first entry of the cache. Depending on the cache size, number of distinct key, throughput, and your commit interval, the cache sends entries downstream. This happens either on cache eviction for a single key entry or as a complete flush of the cache (sending all entries downstream). Thus, the KTable changelog might only show <key:2>
(because <key:1>
got de-duplicated).
You can control the size of the cache via Streams configuration parameter StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
. If you set the value to zero, you disable caching completely and the KTable changelog will contain all updates (effectively providing pre 0.10.1.0
behavior).
Confluent documentation contains a section explaining the cache in more detail:
- http://docs.confluent.io/current/streams/architecture.html#record-caches
- http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management