Kafka streams - First example WordCount doesn'

2019-09-18 08:48发布

I'm studying Kafka Streams and I have a problem with the first example of WordCount in Java 8, taken from the documentation.

Using the latest available versions of kafka streams, Kafka Connect and WordCount lambda expressions example.

I follow the following steps: I create an input topic in Kafka, and an output one. Start the app streaming and then uploading the input topic by inserting some words from a .txt file

On the first count, in the output topic I see the words grouped correctly, but the counts are wrong. If I try to reinsert the same words, the successive counts from the previous incorrect counts are all correct.

If I Looking the input topic dump with a consumer console, it's loaded properly and there are no dirty data.

How is it that the first time count wrong?

Example [FIRST DATA]: (input Topic in Kafka) hi hi mike mike test

(App streaming is running)

(output Topic) hi 12 mike 4 test 3 (casual counts)

[SUCCESSIVE DATA - Posting in the input topic the same words]

(output Topic) hi 14 mike 6 test 4

[NEW ATTEMPT]

(output Topic) hi 16 mike 8 test 5

and so on....

1条回答
贪生不怕死
2楼-- · 2019-09-18 09:32

The WordCount demo in Apache Kafka has the following lines:

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

This means that, when you restart the app, it will read its input topic from the very beginning ("earliest") iff there are no existing consumer offsets for the WordCount app stored in Kafka. An app's consumer offsets expire in Kafka after a certain amount of app inactivity, the default is 24 hours (cf. the offsets.retention.minutes broker configuration).

I could imagine that the following happened:

  • You experimented with Kafka some time earlier and entered test data to the input topic.
  • Then you took a >24 hours break before resuming your experiments.
  • Now the app, when it restarted, reverted to re-reading the input topic all the way from the beginning, thereby picking up older test input data and thus leading to "inflated" counts.

If I Looking the input topic dump with a consumer console, it's loaded properly and there are no dirty data.

You can verify my hypothesis above by looking at the input topic again with the console consumer while adding the CLI option --from-beginning (see https://kafka.apache.org/documentation/#quickstart_consume).

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourInputTopic --from-beginning

This will show you all the available data in the topic "yourInputTopic" -- minus any data that might have been purged from the Kafka topics in the meantime (the default broker configuration will purge data that is older than 7 days, cf. log.retention.hours).

查看更多
登录 后发表回答