Exception in thread “main” org.apache.spark.SparkE

2019-08-12 04:45发布

问题:

I am working on a data pipeline which takes tweets from Twitter4j -> publishes those tweets to a topic in Kafka -> Spark Streaming subscribes those tweets for processing. But when I run the code I am getting the exception -

Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leaders for Set([LiveTweets,0])

The code is -

import java.util.HashMap
import java.util.Properties
import twitter4j._
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka._
import kafka.javaapi.producer.Producer
import kafka.producer.{KeyedMessage, ProducerConfig}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ 

object TwitterPopularTags {
    def main(args: Array[String]) {

            /** Information necessary for accessing the Twitter API */
        val consumerKey= ""
        val consumerSecret= ""
        val accessToken= ""
        val accessTokenSecret = ""
        val cb = new ConfigurationBuilder()
        cb.setOAuthConsumerKey(consumerKey)
        cb.setOAuthConsumerSecret(consumerSecret)
        cb.setOAuthAccessToken(accessToken)
        cb.setOAuthAccessTokenSecret(accessTokenSecret)
        cb.setJSONStoreEnabled(true)
        cb.setIncludeEntitiesEnabled(true)
        def twitterStream = new TwitterStreamFactory(cb.build()).getInstance()      

        val KafkaTopic = "LiveTweets"
        /* kafka producer properties */
        val kafkaProducer = {
                        val props = new Properties()
                        props.put("metadata.broker.list", "localhost:9092")
                        props.put("serializer.class", "kafka.serializer.StringEncoder")
                        props.put("request.required.acks", "1")
                        val config = new ProducerConfig(props)
                        new Producer[String, String](config)
                     }


        /* Invoked when a new tweet comes */
        val listener = new StatusListener() { 

                           override def onStatus(status: Status): Unit = {
                               val msg = new KeyedMessage[String, String](KafkaTopic,DataObjectFactory.getRawJSON(status))
                               kafkaProducer.send(msg)
              }
                                  override def onException(ex: Exception): Unit = throw ex

                  // no-op for the following events
                  override def onStallWarning(warning: StallWarning): Unit = {}
                  override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {}
                  override def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {}
                  override def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
        }

        twitterStream.addListener(listener)
        // Create Spark Streaming context
        val sparkConf = new SparkConf().setAppName("Twitter-Kafka-Spark Streaming")
        val sc = new SparkContext(sparkConf)
        val ssc = new StreamingContext(sc, Seconds(2))

        // Define the Kafka parameters, broker list must be specified
        val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
        val topics = Set(KafkaTopic)

        // Create the direct stream with the Kafka parameters and topics
        val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)
        val tweets = kafkaStream.map(_._2)
        tweets.print()
        ssc.start();
        ssc.awaitTermination();

  }

}

and the stack trace is -

Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leaders for Set([LiveTweets,0])
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
    at scala.util.Either.fold(Either.scala:97)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:412)
    at TwitterPopularTags$.main(TwitterPopularTags.scala:98)
    at TwitterPopularTags.main(TwitterPopularTags.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/08/03 11:34:54 WARN DFSClient: Unable to persist blocks in hflush for /user/spark/applicationHistory/local-1438619692937.inprogress
java.io.IOException: The client is stopped
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1500)
    at org.apache.hadoop.ipc.Client.call(Client.java:1438)
    at org.apache.hadoop.ipc.Client.call(Client.java:1399)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
    at com.sun.proxy.$Proxy19.fsync(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.fsync(ClientNamenodeProtocolTranslatorPB.java:814)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy20.fsync(Unknown Source)
    at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2067)
    at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1959)
    at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
    at org.apache.spark.scheduler.EventLoggingListener.onBlockManagerAdded(EventLoggingListener.scala:171)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:46)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
15/08/03 11:34:54 WARN DFSClient: Error while syncing
java.nio.channels.ClosedChannelException
    at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1635)
    at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2074)
    at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1959)
    at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
    at org.apache.spark.scheduler.EventLoggingListener.onBlockManagerAdded(EventLoggingListener.scala:171)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:46)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
15/08/03 11:34:54 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
    at org.apache.spark.scheduler.EventLoggingListener.onBlockManagerAdded(EventLoggingListener.scala:171)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:46)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
Caused by: java.nio.channels.ClosedChannelException
    at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1635)
    at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2074)
    at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1959)
    at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
    ... 19 more
15/08/03 11:34:54 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
    at org.apache.spark.scheduler.EventLoggingListener.onApplicationStart(EventLoggingListener.scala:177)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:52)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
Caused by: java.io.IOException: Filesystem closed
    at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
    at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1998)
    at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1959)
    at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
    ... 19 more

Thank you.

回答1:

The kafka topic "LiveTweets" didn't exist yet at the time you initially ran the code.