Kafka halting because log truncation is not allowe

2019-07-16 07:26发布

问题:

We are facing an issue with our kafka set up as we see our Kafka nodes goes down showing this error every couple of days.

 Halting because log truncation is not allowed for topic __consumer_offsets, 
Current leader 11's latest offset 123 is less than replica 13's latest offset 234 . 

Every-time there is new topic mentioned in the error log. We have 3 Kafka nodes with 3 zookeeper nodes. Could you what is causing this issue and how it can be fixed.

This is the code which checks for this error

 /**
 * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
 * and before it has completely caught up with the leader's logs, all replicas in the ISR go down. The follower is now uncleanly
 * elected as the new leader, and it starts appending messages from the client. The old leader comes back up, becomes a follower
 * and it may discover that the current leader's end offset is behind its own end offset.
 *
 * In such a case, truncate the current follower's log to the current leader's end offset and continue fetching.
 *
 * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
 */
val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP)

if (leaderEndOffset < replica.logEndOffset.messageOffset) {
  // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
  // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
  // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
  if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
    ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
    // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
    fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader " +
      s"${sourceBroker.id}'s latest offset $leaderEndOffset is less than replica ${brokerConfig.brokerId}'s latest " +
      s"offset ${replica.logEndOffset.messageOffset}")
    throw new FatalExitError
  }

Thanks

回答1:

This happens with 0.10.0 and occurs even with min.insync.replicas=2.

The leader of a partition writes to followers before committing itself (especially for topics with acks=all like __consumer_offsets). When a short network interruption occurs, the follower might recover fast and before messages are written to the leader and therefore the replica halts due to unclean leader election. This was a known issue that has been fixed on 0.11.0.

One possible solution would be to set unclean.leader.election.enable=true for topics like __consumer_offsets and then restart the brokers. According to Kafka docs,

unclean.leader.election.enable: Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.

When a broker crashes, the leader partition will be switched by the Controller that will also choose one replica in ISR as the partition leader. If no replica is available, then you won't be able to write to or read from that partition. By setting unlcean.leader.election.enable to true, the first replica which is available, will be elected as the partition leader even if it is not in ISR and therefore, some messages might be lost!

However, in order to solve this issue I would suggest to upgrade to a more stable version (if you are still using 0.10.0).