Error reading field 'topics': java.nio.Buf

2019-04-10 23:41发布

问题:

9.0 client to consume messages from two brokers which are running on a remote system.My producer is working fine and is able to send messages to the broker but my consumer is not able to consume these messages.Consumer and producer are running on my local system and the two brokers are on aws. Whenever I try to run consumer. Following error appears on the broker logs.

ERROR Closing socket for /122.172.17.81 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
        at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
        at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
        at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
        at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
        at kafka.network.Processor.read(SocketServer.scala:450)
        at kafka.network.Processor.run(SocketServer.scala:340)
        at java.lang.Thread.run(Thread.java:745)

My Consumer code is as follows>

package Kafka1.K1;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

public class HelloKafkaConsumer 
{
    public static void main(String args[]) throws InterruptedException, ExecutionException {
        String a[] = new String[]{"loader1"};
        //topik.add("loader1");
Properties props = new Properties();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP1:9092,IP2:9093");
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
 props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
 props.put("heartbeat.interval.ms", "500");
 props.put("session.timeout.ms", "1000");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "10000");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
 consumer.subscribe(Arrays.asList(a));
 while (true) {
        // Poll for ConsumerRecords for a certain amount of time
        ConsumerRecords<String, String> records = consumer.poll(1000);

        // Process the ConsumerRecords, if any, that came back
        for (ConsumerRecord<String, String> record : records) {
                String key = record.key();
                String value = record.value();
                System.out.println(key+":"+value);
                // Do something with message
        }
      }

    }
}

Can someone help?

回答1:

This issue occurs when the kafka cluster running on your machine is older version i.e 0.8.x.x where as the client being used to access data from the cluster is of higher version i.e 0.9.x.x.

There are two simple solutions based on requirements:

  1. Downgrade the client version.
  2. Upgrade the kafka cluster.