Java, How to get number of messages in a topic in

2019-01-10 09:38发布

I am using apache kafka for messaging. I have implemented the producer and consumer in Java. How can we get the number of messages in a topic?

14条回答
beautiful°
2楼-- · 2019-01-10 10:02

The only way that comes to mind for this from a consumer point of view is to actually consume the messages and count them then.

The Kafka broker exposes JMX counters for number of messages received since start-up but you cannot know how many of them have been purged already.

In most common scenarios, messages in Kafka is best seen as an infinite stream and getting a discrete value of how many that is currently being kept on disk is not relevant. Furthermore things get more complicated when dealing with a cluster of brokers which all have a subset of the messages in a topic.

查看更多
别忘想泡老子
3楼-- · 2019-01-10 10:02

In most recent versions of Kafka Manager, there is a column titled Summed Recent Offsets.

enter image description here

查看更多
祖国的老花朵
4楼-- · 2019-01-10 10:02

I haven't tried this myself, but it seems to make sense.

You can also use kafka.tools.ConsumerOffsetChecker (source).

查看更多
倾城 Initia
5楼-- · 2019-01-10 10:02

Using the Java client of Kafka 2.11-1.0.0, you can do the following thing :

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

Output is something like this :

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
查看更多
ゆ 、 Hurt°
6楼-- · 2019-01-10 10:02

If you have access to server's JMX interface, the start & end offsets are present at:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(you need to replace TOPICNAME & PARTITIONNUMBER). Bear in mind you need to check for each of the replicas of given partition, or you need to find out which one of the brokers is the leader for a given partition (and this can change over time).

Alternatively, you can use Kafka Consumer methods beginningOffsets and endOffsets.

查看更多
混吃等死
7楼-- · 2019-01-10 10:03

Use https://prestodb.io/docs/current/connector/kafka-tutorial.html

A super SQL engine, provided by Facebook, that connects on several data sources (Cassandra, Kafka, JMX, Redis ...).

PrestoDB is running as a server with optional workers (there is a standalone mode without extra workers), then you use a small executable JAR (called presto CLI) to make queries.

Once you have configured well the Presto server , you can use traditionnal SQL:

SELECT count(*) FROM TOPIC_NAME;
查看更多
登录 后发表回答