Java, How to get number of messages in a topic in

2019-01-10 09:57发布

问题:

I am using apache kafka for messaging. I have implemented the producer and consumer in Java. How can we get the number of messages in a topic?

回答1:

The only way that comes to mind for this from a consumer point of view is to actually consume the messages and count them then.

The Kafka broker exposes JMX counters for number of messages received since start-up but you cannot know how many of them have been purged already.

In most common scenarios, messages in Kafka is best seen as an infinite stream and getting a discrete value of how many that is currently being kept on disk is not relevant. Furthermore things get more complicated when dealing with a cluster of brokers which all have a subset of the messages in a topic.



回答2:

It is not java, but may be useful

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 
  | awk -F  ":" '{sum += $3} END {print sum}'


回答3:

I actually use this for benchmarking my POC. The item you want to use ConsumerOffsetChecker. You can run it using bash script like below.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

And below is the result : As you can see on the red box, 999 is the number of message currently in the topic.

Update: ConsumerOffsetChecker is deprecated since 0.10.0, you may want to start using ConsumerGroupCommand.



回答4:

Use https://prestodb.io/docs/current/connector/kafka-tutorial.html

A super SQL engine, provided by Facebook, that connects on several data sources (Cassandra, Kafka, JMX, Redis ...).

PrestoDB is running as a server with optional workers (there is a standalone mode without extra workers), then you use a small executable JAR (called presto CLI) to make queries.

Once you have configured well the Presto server , you can use traditionnal SQL:

SELECT count(*) FROM TOPIC_NAME;


回答5:

Apache Kafka command to get un handled messages on all partitions of a topic:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group

Prints:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

Column 6 is the un-handled messages. Add them up like this:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}'

awk reads the rows, skips the header line and adds up the 6th column and at the end prints the sum.

Prints

5


回答6:

To get all the messages stored for the topic you can seek the consumer to the beginning and end of the stream for each partition and sum the results

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());


回答7:

Sometimes the interest is in knowing the number of messages in each partition, for example, when testing a custom partitioner.The ensuing steps have been tested to work with Kafka 0.10.2.1-2 from Confluent 3.2. Given a Kafka topic, kt and the following command-line:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

That prints the sample output showing the count of messages in the three partitions:

kt:2:6138
kt:1:6123
kt:0:6137

The number of lines could be more or less depending on the number of partitions for the topic.



回答8:

In most recent versions of Kafka Manager, there is a column titled Summed Recent Offsets.



回答9:

I haven't tried this myself, but it seems to make sense.

You can also use kafka.tools.ConsumerOffsetChecker (source).



回答10:

Using the Java client of Kafka 2.11-1.0.0, you can do the following thing :

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

Output is something like this :

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13


回答11:

./kafka-console-consumer.sh --from-beginning --new-consumer --bootstrap-server yourbroker:9092 --property print.key=true --property print.value=false --property print.partition --topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"



回答12:

Excerpts from Kafka docs

Deprecations in 0.9.0.0

The kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) has been deprecated. Going forward, please use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) for this functionality.

I am running Kafka broker with SSL enabled for both server and client. Below command I use

kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x

where /tmp/ssl_config is as below

security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password


回答13:

If you have access to server's JMX interface, the start & end offsets are present at:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(you need to replace TOPICNAME & PARTITIONNUMBER). Bear in mind you need to check for each of the replicas of given partition, or you need to find out which one of the brokers is the leader for a given partition (and this can change over time).

Alternatively, you can use Kafka Consumer methods beginningOffsets and endOffsets.



回答14:

The simplest way I've found is to use the Kafdrop REST API /topic/topicName and specify the key: "Accept" / value: "application/json" header in order to get back a JSON response.

This is documented here.