I'm trying to setup Spark Streaming to get messages from Kafka queue. I'm getting the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o30.createDirectStream.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([test-topic,0])
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
Here is the code I'm executing (pyspark):
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "host.domain:9092"})
ssc.start()
ssc.awaitTermination()
There were a couple of similar posts with the same error. In all cases the cause was the empty kafka topic. There are messages in my "test-topic". I can get them out with
kafka-console-consumer --zookeeper host.domain:2181 --topic test-topic --from-beginning --max-messages 100
Does anyone know what might be the problem?
I'm using:
- Spark 1.5.2 (apache)
- Kafka 0.8.2.0+kafka1.3.0 (CDH 5.4.7)
1) You have to make sure that you have already created topic
test-topic
Run following command to check list of the topic
kafka-topics.sh --list --zookeeper [host or ip of zookeeper]:[port]
2) After checking your topic, you have to configure your Kafka configuration in
Socket Server Settings
sectionlisteners=PLAINTEXT://[host or ip of Kafka]:[port]
Another option to force creating topic if it doesn't exist. You can do this by setting property "auto.create.topics.enable" to "true" in kafkaParams map like this.
Using Scala 2.11 and Kafka 0.10 versions.
One of the reason for this type of error where leader cannot be found for specified topic is Problem with one's Kafka server configs.
Open your Kafka server configs :
In the "Socket Server Settings" section , provide IP for your host if its missing :
I was using Kafka setup provided with MapR sandbox and was trying to access the kafka via spark code. I was getting the same error while accessing my kafka since my configuration was missing the IP.
You need to check 2 things:
check if this topic and partition exists , in your case is topic is
test-topic
and partition is 0.based on your code, you are trying consume message from offset 0 and it might be possible message is not available from offset 0, check what is you earliest offset and try consume from there.
Below is command to check earliest offset:
If you define short host names in /etc/hosts and use them in your kafka servers' configurations, you should change those name to ip. Or register the same short host name in your local PC or client's /etc/hosts.
Error occurred because Spark streaming lib can't resolve short hostname in the PC or client.