consumer reading __consumer_offsets delivers unrea

2020-04-30 03:06发布

I am trying to consume from the __consumer_offsets topic as it seemed this may be the easiest way to retrieve kafka metrics about consumers like message lag etc. The ideal way is accessing it from jmx but wanted to try this first and the messages that come back seem to be encrypted or in unreadable form. Tried adding stringDeserializer property as well. Does anyone have any suggestions on how to correct this? Again the reference to this being a duplicate of

duplicate consumer_offset

is not helpful as it does not reference my issue which is to read message as a string in java. Updated the code as well to try a consumerRecord using kafka.client consumer.

consumerProps.put("exclude.internal.topics",  false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);


consumerProps.put("key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");  
consumerProps.put("value.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");

ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(
       consumerConfig);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
   consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

for (KafkaStream stream : streams) {

    ConsumerIterator<byte[], byte[]> it = stream.iterator();

    //errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");

    while (it.hasNext()) {
         try {

             String mesg = new String(it.next().message());
             System.out.println( mesg);

code changes:

try {       
    // errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");   
    Properties consumerProps = new Properties();
    consumerProps.put("exclude.internal.topics",  false);
    consumerProps.put("group.id" , "test");
    consumerProps.put("bootstrap.servers", servers);
    consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");  
    consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    //ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
    //ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
    //       consumerConfig);

    //Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    //topicCountMap.put(topic, new Integer(1));
    //Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    //List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps); 
    kconsumer.subscribe(Arrays.asList(topic)); 

    try {
        while (true) {
            ConsumerRecords<String, String> records = kconsumer.poll(10);

            for (ConsumerRecord<String, String> record : records)

                System.out.println(record.offset() + ": " + record.value());
        }
    } finally {
          kconsumer.close();
    }    

And the snapshot of what the message looks like below; at the bottom of the image:

consumer offset

1条回答
Fickle 薄情
2楼-- · 2020-04-30 03:32

While it's possible to directly read from the __consumer_offsets topic, this is not the recommended or easiest method.

If you can use the Kafka 2.0, the best is to use the AdminClient APIs to describe groups:


In case, you absolutely want to read directly form __consumer_offset, you need to decode the record to make them human readable. This can be done using the GroupMetadataManager class:

This answer from the question you linked contains skeleton code to perform all that.

Also note that you should not deserialize the records as string but instead keep them as raw bytes for these methods to be able to decode them correctly.

查看更多
登录 后发表回答