We have a Kafka cluster consists of 6 nodes. Five of the 6 nodes have zookeeper.
A spark streaming job is reading from a streaming server, do some processing, and send the result to Kafka.
From time to time the spark job got stuck, no data is sent to Kafka, and the job is restarted.
The job keeps stuck and restarted until we manually restart the Kafka cluster. After restarting Kafka everything is working smoothly.
Checking the Kafka logs we found this exception is thrown several times
2017-03-10 05:12:14,177 ERROR state.change.logger: Controller 133 epoch 616 initiated state change for partition [live_stream_2,52] from OfflinePartition to OnlinePartition failed
kafka.common.NoReplicaOnlineException: No broker in ISR for partition [gnip_live_stream_2,52] is alive. Live brokers are: [Set(133, 137, 134, 135, 143)], ISR brokers are: [142]
at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:66)
at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:345)
at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:205)
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:70)
at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:333)
at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:164)
at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:259)
at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
The exception above is thrown for an unused topic (live_stream_2
) but it is thrown also for a used topic with a little difference.
Here is the exception for the used topic
2017-03-10 12:05:18,535 ERROR state.change.logger: Controller 133 epoch 620 initiated state change for partition [gnip_live_stream,3] from OfflinePartition to OnlinePartition failed
kafka.common.NoReplicaOnlineException: No broker in ISR for partition [live_stream,3] is alive. Live brokers are: [Set(133, 134, 135, 137)], ISR brokers are: [136]
at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:66)
at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:345)
at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:205)
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:70)
at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:333)
at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:164)
at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:259)
at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
In the first exception, it says the ISR broker list for partition 52 contains only the broker with ID 142 which is weird because the cluster has no broker with this id.
In the second exception, it says the ISR broker list for partition 3 contains only the broker with ID 136 which is not existing in the broker live list.
I suspect there is stale data in zookeeper that cause the first exception and for some reason broker 136 was down at specific time which causes the second exception.
My questions
1- Could those exceptions be the reason of Kafka (and consequently the spark job) to stuck?
2- How to solve it?