Couldn't find leaders for Set([TOPICNNAME,0]))

2019-04-10 00:32发布

We are using the Apache Spark 1.5.1 and kafka_2.10-0.8.2.1 and Kafka DirectStream API to fetch data from Kafka using Spark.

We created the topics in Kafka with the following settings

ReplicationFactor :1 and Replica : 1

When all of the Kafka instances are running, the Spark job works fine. When one of the Kafka instances in the cluster is down, however, we get the exception reproduced below. After some time, we restarted the disabled Kafka instance and tried to finish the Spark job, but Spark was had already terminated because of the exception. Because of this, we could not read the remaining messages in the Kafka topics.

ERROR DirectKafkaInputDStream:125 - ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([normalized-tenant4,0]))
ERROR JobScheduler:96 - Error generating jobs for time 1447929990000 ms
org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([normalized-tenant4,0]))
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Thanks in advance. Please help to resolve this issue.

2条回答
放荡不羁爱自由
2楼-- · 2019-04-10 00:49

One of the reason for this type of error where leader cannot be found for specified topic is Problem with one's Kafka server configs.

Open your Kafka server configs :

vim ./kafka/kafka-<your-version>/config/server.properties

In the "Socket Server Settings" section , provide IP for your host if its missing :

listeners=PLAINTEXT://{host-ip}:{host-port}

I was using Kafka setup provided with MapR sandbox and was trying to access the kafka via spark code. I was getting the same error while accessing my kafka since my configuration was missing the IP.

查看更多
我想做一个坏孩纸
3楼-- · 2019-04-10 01:05

This is expected behaviour. You have requested that each topic be stored on one machine by setting ReplicationFactor to one. When the one machine that happens to store the topic normalized-tenant4 is taken down, the consumer cannot find the leader of the topic.

See http://kafka.apache.org/documentation.html#intro_guarantees.

查看更多
登录 后发表回答