When we are trying to stream the data from SSL enabled Kafka topic we are facing below error . Can you please help us on this issue .
19/11/07 13:26:54 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1573151189884] Added fetcher for partitions ArrayBuffer()
19/11/07 13:26:54 WARN ConsumerFetcherManager$LeaderFinderThread: [spark-streaming-consumer_dvtcbddc101.corp.cox.com-1573151189725-d40a510f-leader-finder-thread], Failed to find leader for Set([inst_monitor_status_test,2], [inst_monitor_status_test,0], [inst_monitor_status_test,1])
java.lang.NullPointerException
at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:408)
at kafka.cluster.Broker.connectionString(Broker.scala:62)
at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Pyspark code :
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer
def handler(message):
records = message.collect()
for record in records:
print(record)
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 10)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
counts.pprint()
kvs.foreachRDD(handler)
ssc.start()
ssc.awaitTermination()
Spark submit command :
Spark submit:
/usr/hdp/2.6.1.0-129/spark2/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 dsstream2.py host:2181 inst_monitor_status_test