I have met some issues while trying to consume messages from Kafka with a Spark Streaming application in a Kerberized Hadoop cluster. I tried both of the two approaches listed here :
- receiver-based approach :
KafkaUtils.createStream
- direct approach (no receivers) :
KafkaUtils.createDirectStream
The receiver-based approach (KafkaUtils.createStream
) throws 2 types of exceptions (different exceptions whether I am in local mode (--master local[*]
) or in YARN mode (--master yarn --deploy-mode client
) :
- a weird
kafka.common.BrokerEndPointNotAvailableException
in a Spark local application - a Zookeeper timeout in a Spark on YARN application. I once managed to make this work (connecting to Zookeeper successfully), but no messages were received
In both modes (local or YARN), the direct approach (KafkaUtils.createDirectStream
) returns an unexplained EOFException
(see details below).
My final goal is to launch a Spark Streaming job on YARN, so I will leave the Spark local job aside.
Here is my test environment :
- Cloudera CDH 5.7.0
- Spark 1.6.0
- Kafka 0.10.1.0
I'm working on a single-node cluster (hostname = quickstart.cloudera
) for testing purposes. For those interested to reproduce the tests, I'm working on a custom Docker container based on cloudera/quickstart
(Git repo).
Below is my sample code I used in a spark-shell
. Of course this code works when Kerberos is not enabled : messages produced by kafka-console-producer
are received by the Spark application.
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import kafka.serializer.StringDecoder
val ssc = new StreamingContext(sc, Seconds(5))
val topics = Map("test-kafka" -> 1)
def readFromKafkaReceiver(): Unit = {
val kafkaParams = Map(
"zookeeper.connect" -> "quickstart.cloudera:2181",
"group.id" -> "gid1",
"client.id" -> "cid1",
"zookeeper.session.timeout.ms" -> "5000",
"zookeeper.connection.timeout.ms" -> "5000"
)
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY_2)
stream.print
}
def readFromKafkaDirectStream(): Unit = {
val kafkaDirectParams = Map(
"bootstrap.servers" -> "quickstart.cloudera:9092",
"group.id" -> "gid1",
"client.id" -> "cid1"
)
val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaDirectParams, topics.map(_._1).toSet)
directStream.print
}
readFromKafkaReceiver() // or readFromKafkaDirectStream()
ssc.start
Thread.sleep(20000)
ssc.stop(stopSparkContext = false, stopGracefully = true)
With Kerberos enabled, this code does not work. I followed this guide : Configuring Kafka Security, and created two configuration files :
jaas.conf
:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
client.properties
:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
I can produce messages with :
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas.conf"
kafka-console-producer \
--broker-list quickstart.cloudera:9092 \
--topic test-kafka \
--producer.config client.properties
But I can't consume those messages from a Spark Streaming application. To launch spark-shell
in yarn-client
mode, I just created a new JAAS configuration (jaas_with_zk_yarn.conf
), with a Zookeeper section (Client
), and with the reference to the keytab being only the name of the file (the keytab is then passed through --keytab
option) :
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
This new file is passed in --files
option :
spark-shell --master yarn --deploy-mode client \
--num-executors 2 \
--files /home/simpleuser/jaas_with_zk_yarn.conf \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
--keytab /home/simpleuser/simpleuser.keytab \
--principal simpleuser
I used the same code as previously, except that I added two other Kafka parameters, corresponding to the contents of consumer.properties
file :
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka"
readFromKafkaReceiver()
throws the following error once Spark Streaming Context is started (cannot connect to Zookeeper) :
ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:191)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:139)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:156)
at kafka.consumer.Consumer$.create(ConsumerConnector.scala:109)
at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Sometimes the connection to ZK is established (no timeout reached), but then no messages are received.
readFromKafkaDirectStream()
throws the following error as soon as this method is called :
org.apache.spark.SparkException: java.io.EOFException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.readFromKafkaDirectStream(<console>:47)
There is no more explanation, just an EOFException
. I presume there are communication problems between Spark and Kafka broker, but no more explanations. I also tried metadata.broker.list
instead of bootstrap.servers
, but without success.
Maybe I'm missing something in the JAAS config files, or in Kafka parameters ? Maybe the Spark options (extraJavaOptions
) are invalid ? I tried so much possibilities I'm a little bit lost.
I'll be glad if someone could help me to fix at least one of these problems (direct approach or receiver-based). Thanks :)