How to use Spark Streaming with Kafka with Kerbero

2020-02-09 18:47发布

问题:

I have met some issues while trying to consume messages from Kafka with a Spark Streaming application in a Kerberized Hadoop cluster. I tried both of the two approaches listed here :

  • receiver-based approach : KafkaUtils.createStream
  • direct approach (no receivers) : KafkaUtils.createDirectStream

The receiver-based approach (KafkaUtils.createStream) throws 2 types of exceptions (different exceptions whether I am in local mode (--master local[*]) or in YARN mode (--master yarn --deploy-mode client) :

  • a weird kafka.common.BrokerEndPointNotAvailableException in a Spark local application
  • a Zookeeper timeout in a Spark on YARN application. I once managed to make this work (connecting to Zookeeper successfully), but no messages were received

In both modes (local or YARN), the direct approach (KafkaUtils.createDirectStream) returns an unexplained EOFException (see details below).

My final goal is to launch a Spark Streaming job on YARN, so I will leave the Spark local job aside.

Here is my test environment :

  • Cloudera CDH 5.7.0
  • Spark 1.6.0
  • Kafka 0.10.1.0

I'm working on a single-node cluster (hostname = quickstart.cloudera) for testing purposes. For those interested to reproduce the tests, I'm working on a custom Docker container based on cloudera/quickstart (Git repo).

Below is my sample code I used in a spark-shell. Of course this code works when Kerberos is not enabled : messages produced by kafka-console-producer are received by the Spark application.

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import kafka.serializer.StringDecoder

val ssc = new StreamingContext(sc, Seconds(5))

val topics = Map("test-kafka" -> 1)

def readFromKafkaReceiver(): Unit = {
    val kafkaParams = Map(
        "zookeeper.connect" -> "quickstart.cloudera:2181",
        "group.id" -> "gid1",
        "client.id" -> "cid1",
        "zookeeper.session.timeout.ms" -> "5000",
        "zookeeper.connection.timeout.ms" -> "5000"
    )

    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY_2)
    stream.print
}

def readFromKafkaDirectStream(): Unit = {
    val kafkaDirectParams = Map(
        "bootstrap.servers" -> "quickstart.cloudera:9092",
        "group.id" -> "gid1",
        "client.id" -> "cid1"
    )

    val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaDirectParams, topics.map(_._1).toSet)
    directStream.print
}

readFromKafkaReceiver() // or readFromKafkaDirectStream()

ssc.start

Thread.sleep(20000)

ssc.stop(stopSparkContext = false, stopGracefully = true)

With Kerberos enabled, this code does not work. I followed this guide : Configuring Kafka Security, and created two configuration files :

jaas.conf :

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

client.properties :

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

I can produce messages with :

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas.conf"
kafka-console-producer \
    --broker-list quickstart.cloudera:9092 \
    --topic test-kafka \
    --producer.config client.properties

But I can't consume those messages from a Spark Streaming application. To launch spark-shell in yarn-client mode, I just created a new JAAS configuration (jaas_with_zk_yarn.conf), with a Zookeeper section (Client), and with the reference to the keytab being only the name of the file (the keytab is then passed through --keytab option) :

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

This new file is passed in --files option :

spark-shell --master yarn --deploy-mode client \
    --num-executors 2 \
    --files /home/simpleuser/jaas_with_zk_yarn.conf \
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
    --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
    --keytab /home/simpleuser/simpleuser.keytab \
    --principal simpleuser

I used the same code as previously, except that I added two other Kafka parameters, corresponding to the contents of consumer.properties file :

"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka"

readFromKafkaReceiver() throws the following error once Spark Streaming Context is started (cannot connect to Zookeeper) :

ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000
        at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
        at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
        at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
        at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:191)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:139)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:156)
        at kafka.consumer.Consumer$.create(ConsumerConnector.scala:109)
        at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
        at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
        at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Sometimes the connection to ZK is established (no timeout reached), but then no messages are received.

readFromKafkaDirectStream() throws the following error as soon as this method is called :

org.apache.spark.SparkException: java.io.EOFException
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.readFromKafkaDirectStream(<console>:47)

There is no more explanation, just an EOFException. I presume there are communication problems between Spark and Kafka broker, but no more explanations. I also tried metadata.broker.list instead of bootstrap.servers, but without success.

Maybe I'm missing something in the JAAS config files, or in Kafka parameters ? Maybe the Spark options (extraJavaOptions) are invalid ? I tried so much possibilities I'm a little bit lost.

I'll be glad if someone could help me to fix at least one of these problems (direct approach or receiver-based). Thanks :)

回答1:

It is not supported with Spark 1.6, as stated in Cloudera docs:

Spark Streaming cannot consume from secure Kafka till it starts using Kafka 0.9 Consumer API

https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_rn_spark_ki.html#ki_spark_streaming_consumer_api

Spark-streaming in 1.6 uses old consumer API, where secure consuming is not supported.

You can use Spark 2.1, which supports secure Kafka: https://blog.cloudera.com/blog/2017/05/reading-data-securely-from-apache-kafka-to-apache-spark/