I am working on a data pipeline which takes tweets from Twitter4j -> publishes those tweets to a topic in Kafka -> Spark Streaming subscribes those tweets for processing. But when I run the code I am getting the exception -
Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leaders for Set([LiveTweets,0])
The code is -
import java.util.HashMap
import java.util.Properties
import twitter4j._
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka._
import kafka.javaapi.producer.Producer
import kafka.producer.{KeyedMessage, ProducerConfig}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
object TwitterPopularTags {
def main(args: Array[String]) {
/** Information necessary for accessing the Twitter API */
val consumerKey= ""
val consumerSecret= ""
val accessToken= ""
val accessTokenSecret = ""
val cb = new ConfigurationBuilder()
cb.setOAuthConsumerKey(consumerKey)
cb.setOAuthConsumerSecret(consumerSecret)
cb.setOAuthAccessToken(accessToken)
cb.setOAuthAccessTokenSecret(accessTokenSecret)
cb.setJSONStoreEnabled(true)
cb.setIncludeEntitiesEnabled(true)
def twitterStream = new TwitterStreamFactory(cb.build()).getInstance()
val KafkaTopic = "LiveTweets"
/* kafka producer properties */
val kafkaProducer = {
val props = new Properties()
props.put("metadata.broker.list", "localhost:9092")
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("request.required.acks", "1")
val config = new ProducerConfig(props)
new Producer[String, String](config)
}
/* Invoked when a new tweet comes */
val listener = new StatusListener() {
override def onStatus(status: Status): Unit = {
val msg = new KeyedMessage[String, String](KafkaTopic,DataObjectFactory.getRawJSON(status))
kafkaProducer.send(msg)
}
override def onException(ex: Exception): Unit = throw ex
// no-op for the following events
override def onStallWarning(warning: StallWarning): Unit = {}
override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {}
override def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {}
override def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
}
twitterStream.addListener(listener)
// Create Spark Streaming context
val sparkConf = new SparkConf().setAppName("Twitter-Kafka-Spark Streaming")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
// Define the Kafka parameters, broker list must be specified
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = Set(KafkaTopic)
// Create the direct stream with the Kafka parameters and topics
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)
val tweets = kafkaStream.map(_._2)
tweets.print()
ssc.start();
ssc.awaitTermination();
}
}
and the stack trace is -
Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leaders for Set([LiveTweets,0])
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:412)
at TwitterPopularTags$.main(TwitterPopularTags.scala:98)
at TwitterPopularTags.main(TwitterPopularTags.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/08/03 11:34:54 WARN DFSClient: Unable to persist blocks in hflush for /user/spark/applicationHistory/local-1438619692937.inprogress
java.io.IOException: The client is stopped
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1500)
at org.apache.hadoop.ipc.Client.call(Client.java:1438)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy19.fsync(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.fsync(ClientNamenodeProtocolTranslatorPB.java:814)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy20.fsync(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2067)
at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1959)
at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
at org.apache.spark.scheduler.EventLoggingListener.onBlockManagerAdded(EventLoggingListener.scala:171)
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:46)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
15/08/03 11:34:54 WARN DFSClient: Error while syncing
java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1635)
at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2074)
at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1959)
at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
at org.apache.spark.scheduler.EventLoggingListener.onBlockManagerAdded(EventLoggingListener.scala:171)
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:46)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
15/08/03 11:34:54 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
at org.apache.spark.scheduler.EventLoggingListener.onBlockManagerAdded(EventLoggingListener.scala:171)
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:46)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
Caused by: java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1635)
at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2074)
at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1959)
at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
... 19 more
15/08/03 11:34:54 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
at org.apache.spark.scheduler.EventLoggingListener.onApplicationStart(EventLoggingListener.scala:177)
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:52)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1998)
at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1959)
at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
... 19 more
Thank you.