Rebalancing issue while reading messages in Kafka

2020-02-23 07:13发布

I am trying to read messages on Kafka topic, but I am unable to read it. The process gets killed after sometime, without reading any messages.

Here is the rebalancing error which I get:

[2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer:  (kafka.consumer.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
    at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
    at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
    at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
    at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 0 messages

I tried to run ConsumerOffsetChecker, and this is the error which I get. I have no clue whatsoever, how to resolve this. Anybody, any idea?

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group  topic_group
Group           Topic                          Pid Offset          logSize         Lag             Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
        at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
        at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        ... 16 more

5条回答
再贱就再见
2楼-- · 2020-02-23 07:36

I have got similar problems recently. You can try to increase the consumer configurations rebalance.backoff.ms and zookeeper.session.timeout.ms to about 5-10 seconds.

The first parameter tell kafka to wait more before retrying rebalance. The second one tell kafka to be more patient while trying to connect to zookeeper.

Other configuration options can be found on the official documentation.

查看更多
▲ chillily
3楼-- · 2020-02-23 07:36

There is a bug in kafka.tools.ConsumerOffsetChecker. If the a particular Zookeeper node holding consumed offset information doesn't exit, the tool exits throwing the execption.

For example, suppose you have a consumer group "mygroup" and a topic "topictest". Then the offset for partition 2 is maintained in Znode: /consumers/mygroup/offsets/topictest/2.

If there is no entry for partition 2 of topic topictest in Znode, then consumer offsetchecker tool will exit while checking offset for partition 2. Basically, it will fail while checking the first partition "n" for which the Znode /consumers/mygroup/offsets/topictest/n is missing on Zookeeper.

查看更多
干净又极端
4楼-- · 2020-02-23 07:49

probably your brokers are offline and they are not able to connect to Zookeeper, have you tried running the console-consumer script available in the $KAFKA_ROOT_DIR/bin path for checking if you are able to consume from a specific topic.

查看更多
我命由我不由天
5楼-- · 2020-02-23 07:51

This probably means that the brokers did not create those nodes correctly when it connected to Zookeeper. The /consumer path should exist when you try to consume.

Here are a few paths for debugging:

Did you create any topics?

If so:

  1. How many partitions are there in the topic?
  2. Did you check that the topic metadata were correctly populated in the zookeeper?
  3. Can we see your consumer configuration?

If not:

  1. Then you need to create a topic using the script $KAFKA_DIR/bin/kafka-create-topic.sh. Look inside the script for usage details.
  2. Once you make a topic, you need to create a consumer with a group ID that has not been used before, otherwise you will not start fresh.
查看更多
ゆ 、 Hurt°
6楼-- · 2020-02-23 07:56

Another Issue might be because of jar conflicts. If you have the same jar with different versions stored in library folder. This Issue may arise. jars like scala-library ,zkclient, zookeeper, kafka-client should not be duplicated with different versions.

查看更多
登录 后发表回答