Counting Number of messages stored in a kafka topi

2020-05-19 05:09发布

I'm using 0.9.0.0 version of Kafka and I want to count the number of messages in a topic without using the admin script kafka-console-consumer.sh.

I have tried all the commands in the answer Java, How to get number of messages in a topic in apache kafka but none are yielding the result. Can anyone help me out here?

4条回答
迷人小祖宗
2楼-- · 2020-05-19 05:40

You could try to execute the command below:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1

Then, sum up all the counts for each partition.

Updated: Java implementation

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
......
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList("your_topic"));
    Set<TopicPartition> assignment;
    while ((assignment = consumer.assignment()).isEmpty()) {
        consumer.poll(Duration.ofMillis(100));
    }
    final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
    final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(assignment);
    assert (endOffsets.size() == beginningOffsets.size());
    assert (endOffsets.keySet().equals(beginningOffsets.keySet()));

    Long totalCount = beginningOffsets.entrySet().stream().mapToLong(entry -> {
            TopicPartition tp = entry.getKey();
            Long beginningOffset = entry.getValue();
            Long endOffset = endOffsets.get(tp);
            return endOffset - beginningOffset;
        }).sum();
    System.out.println(totalCount);
}
查看更多
聊天终结者
3楼-- · 2020-05-19 05:53

You can also do this using awk and a simple loop

for i in `kafka-run-class kafka.tools.GetOffsetShell --broker-list broker:9092 --time -1 --topic topic_name| awk -F : '{print $3}'`; do sum=$(($sum+$i)); done
查看更多
一夜七次
4楼-- · 2020-05-19 05:57

Technically speaking you can simply consume all messages from the topic and count them:

Example:

kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic XYZ --partition 0*

However kafka.tools.GetOffsetShell approach will give you the offsets and not the actual number of messages in the topic. It means if the topic gets compacted you will get two differed numbers if you count messages by consuming them or by reading offsets.

Topic compaction: https://kafka.apache.org/documentation.html#design_compactionbasics

查看更多
Anthone
5楼-- · 2020-05-19 05:59

you can sum up all counts by using this :

.../bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list <<broker_1>>:9092,<<broker_2:9092>>... --topic <<your_topic_name>> --time -1 | while IFS=: read topic_name partition_id number; do echo "$number"; done | paste -sd+ - | bc
查看更多
登录 后发表回答