I'm trying to read from a Kafka topic with Spark Streaming direct stream but I receive the following error:
INFO consumer.SimpleConsumer: Reconnect due to socket error: java.net.SocketTimeoutException
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.net.SocketTimeoutException
org.apache.spark.SparkException: java.net.SocketTimeoutException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
I have Kafka 0.7.1 and Spark 1.5.2.
I'm using the following code:
val ssc : StreamingContext = new StreamingContext(sparkContext, Seconds(60))
val topicsSet = Set("myTopic")
val kafkaParams = Map[String, String]
("metadata.broker.list" -> "mybrokerhostname1:9092,mybrokerhostname2:9092")
val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet)
I am sure that the topic already exists because other applications are correctly reading from it.
Try not to use older version of kafka, in your case it is (0.7.1). If you have a strong reason to use 0.7.1, do let me know. Looking at your exception, it looks like the application is not able to connect to kafka brokers.
I have used this direct stream api to read from kafka 0.8.2. https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala
Hope, this will solve your problem.
Thanks & Regards, Vikas Gite