Kafka - This server is not the leader for that top

2019-04-09 02:48发布

问题:

I have two broker kafka 0.10.2.0 cluster.Replication factor is 2. I am running 1.0.0 kafka stream application against this Kafka. In my kafka stream application, producer config has retries = 10 and retry.backoff.ms = 100

After running few minutes, I observed following logs in Kakfa server.log. Due to this Kafka stream application is throwing 'NOT_LEADER_FOR_PARTITION' exception.

What may be the possible reason? Please help me.

[2017-12-12 10:26:02,583] ERROR [ReplicaFetcherThread-0-1], Error for partition [__consumer_offsets,22] to broker 1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)

回答1:

Each topic is served by one or multiple Brokers - one is leader and the remaining brokers are followers.

A producer needs to send new messages to the leader Broker which internally replicate the data to all followers.

I assume, that your producer client does not connect to the correct Broker, its connect to a follower instead of the leader, and this follower rejects your send request.

Try to run ./kafka-topics.sh --zookeeper localhost:2181 --topic your_topic --describe

Topic:your_topic    PartitionCount:3    ReplicationFactor:1 Configs:retention.ms=14400000
Topic: your_topic   Partition: 0    Leader: 2   Replicas: 2 Isr: 2
Topic: your_topic   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
Topic: your_topic   Partition: 2    Leader: 1   Replicas: 1 Isr: 1

In this example you can see that your_topic have 3 partitions meaning all 3 brokers are leaders of that topic each on different partition, s.t broker 2 is leader on partition 0 and broker 0 and broker 1 are followers on partition 0.



回答2:

Try setting these properties and see if it helps resolve the issue:

props.put(ProducerConfig.RETRIES_CONFIG, 10);  //increase to 10 from default of 0
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,   
    Integer.toString(Integer.MAX_VALUE)); // increase to infinity from default of 300 s

(Source)