I am a beginner in Spark streaming and Scala. For a project requirement I was trying to run TwitterPopularTags example present in github. As SBT assembly was not working for me and I was not familiar with SBT I am trying to use Maven for building. After a lot of initial hiccups, I was able to create the jar file. But while trying to execute it I am getting the following error. Can anybody help me in resolving this?
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$
at TwitterPopularTags$.main(TwitterPopularTags.scala:43)
at TwitterPopularTags.main(TwitterPopularTags.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.twitter.TwitterUtils$
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 9 more
I have added following dependencies
Spark-streaming_2.10:1.1.0
Spark-core_2.10:1.1.0
Spark-streaming-twitter_2.10:1.1.0
I even tried the 1.2.0 for Spark-streaming-twitter but that also was giving me the same error.
Thanks for the help in advance.
Regards,
vpv
Thank you for giving your suggestion. I was able to resolve this issue by using SBT assembly only. Following is the details regarding how I did this.
Spark - Already present in Cloudera VM
Scala - Not sure if this is present in Cloudera, if not we can install it
SBT - This also needs to be installed. I did both the installs on my local machine and transferred the Jar to the VM. For installing this I used the following link
https://gist.github.com/visenger/5496675
1) Once all these are created. We have to create the parent folder for our project. I created a folder called Twitter.
2) Create another folder with the following structure Twitter/src/main/scala and created the .scala file in this folder with the name TwitterPopularTags.scala. This has slight changes from the code which we got from the github. I had to change the import statements
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
3) After this, create another folder under the parent folder with the following name
Twitter/project
and create a file with the name assembly.sbt . This has the path for the assembly plugin. Following is the full code present in the file.
resolvers += Resolver.url("sbt-plugin-releases-scalasbt", url("http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/"))
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
4) Once the above two are created, create a file in the parent directory of the project (Twitter) with the name build.sbt. This is where we need to provide the name of the .Jar file we need to create and also the dependencies. Please note that even the blank lines between the codes in this file are important.
name := "TwitterPopularTags"
version := "1.0"
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.1.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.2.0"
libraryDependencies += "org.twitter4j" % "twitter4j-stream" % "3.0.3"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
5) Finally we have to open the terminal and go to the parent folder of the project (Twitter). From here enter the following command:
sbt assembly
This will download the dependencies and create the jar file we need.
6) In order to run the program we need a twitter app created under our ID and provide the auth token and other details. The detailed step on how to create this is present in following link.
http://ampcamp.berkeley.edu/3/exercises/realtime-processing-with-spark-streaming.html
7) Once we have all the above done we can use the spark-submit command from VM to run the job. Example command is
./bin/spark-submit \
--class TwitterPopularTags \
--master local[4] \
/path/to/TwitterPopilarTags.jar \
comsumerkey consumersecret accesstoken accesssecret
8) This prints the output to the console so to monitor the output it is better to reduce the frequency
by adjusting the code.
Please let me know if any more details are required.
Thanks & Regards,
VPV
Found an easy solution (that works with 1.5.1 for sure but maybe with earlier versions too):
Submit with --packages
parameter and Maven coordinates, like:
spark-submit --master local[*] \
--class TwitterStreaming \
--packages "org.apache.spark:spark-streaming-twitter_2.10:1.5.1" \
${PATH_TO_JAR_IN_TARGET}
Described at
http://spark.apache.org/docs/latest/programming-guide.html#using-the-shell
This error simply means that at runtime TwitterUtils class(or in scala language TwitterUtils object) is not presented, however at compile time it is presented(because otherwise you won't be able to build with maven). You should make sure that the jar file you created indeed contains that class/object. You can simply unzip that jar file to see what is actually included. Most likely your maven build file uses the dependency to build the project but does not eventually include it in your final jar.
Try to do it this way...
./bin/spark-submit \
--class TwitterPopularTags \
--jars (external_jars like twitter4j,streaming-twitter) \
--master local[4] \
/path/to/TwitterPopilarTags.jar \
comsumerkey consumersecret accesstoken accesssecret
**I have the same problem and I am not able to fix**
name := "SentimentAnalyser"
version := "0.1"
scalaVersion := "2.11.11"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"
// https://mvnrepository.com/artifact/org.apache.spark/park-streaming-twitter_2.11
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11
libraryDependencies += "org.apache.spark" % "spark-streaming-twitter_2.11" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
package com
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
object Sentiment {
def main(args: Array[String]): Unit = {
if(args.length<4){
System.out.print("Enter Consumer Key (API Key) Consumer Secret (API Secret)Access Token Access Token Secret")
System.exit(1);
}
val Array(customer_key,customer_secrect,access_token,access_token_secret)=args.take(4)
System.setProperty("twiteer4j.oauth.consumerKey",customer_key)
System.setProperty("twiteer4j.oauth.customerSecret",customer_secrect)
System.setProperty("twiteer4j.oauth.accessToken",access_token)
System.setProperty("twiteer4j.oauth.accessTokenSecret",access_token_secret)
val conf=new SparkConf().setAppName("Sentiment").setMaster("local")
val scc=new StreamingContext(conf,Seconds(30))
//Dstream
val stream=TwitterUtils.createStream(scc,None)
val hashTag=stream.flatMap(status=>{status.getText.split(" ").filter(_.startsWith("#"))})
val topHashTag60=hashTag.map((_,1)).reduceByKeyAndWindow(_+_,Seconds(60))
.map{case (topic,count)=>(topic,count)}.transform(_.sortByKey(false))
val topHashTag10=hashTag.map((_,1)).reduceByKeyAndWindow(_+_,Seconds(10))
.map{case (topic,count)=>(topic,count)}.transform(_.sortByKey(false))
topHashTag60.foreachRDD(rdd=>{
val topList=rdd.take(10)
println("Popular topic in last 60 sec (%s total)".format(rdd.count()))
topList.foreach{case (count,tag)=>println("%s (%s tweets)".format(tag,count))}
})
topHashTag10.foreachRDD(rdd=>{
val topList=rdd.take(10)
println("Popular topic in last 10 sec (%s total)".format(rdd.count()))
topList.foreach{case (count,tag)=>println("%s (%s tweets)".format(tag,count))}
})
scc.start()
scc.awaitTermination()
}
}
I build jar using artifact in IJ ..
spark-submit --class com.Sentiment /root/Desktop/SentimentAnalyser.jar XX XX XX XX
ERROR:
17/10/29 01:22:24 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.46.132, 34179, None)
17/10/29 01:22:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$
at com.Sentiment$.main(Sentiment.scala:26)
at com.Sentiment.main(Sentiment.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.twitter.TwitterUtils$
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java
To include the dependencies in your jar you will need to instruct Maven to build a "fat jar". A "fat jar" is a jar that includes the .class files not only for your project, but for all required dependencies as well (this is what sbt assembly does). The default Maven behavior is to treat your project like a library and therefore build a jar with only your .class files.
Here is a simple maven pom which will do what you want, note that I have included some other common Spark + Maven behaviors such as using Scala, but the most relevant part is near the bottom:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jmess.sparkexamples</groupId>
<artifactId>example</artifactId>
<version>1.0.0</version>
<properties>
<!-- Use java 1.8 -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!-- Keep compiled against scala version uniform -->
<scala.base.version>2.11</scala.base.version>
<!-- Use most recent version of Scala compatible with stable Spark release -->
<scala.version>${scala.base.version}.12</scala.version>
<!-- Facilitates keeping multiple Spark dependencies aligned -->
<spark.version>2.4.0</spark.version>
</properties>
<dependencies>
<!-- Begin Spark Dependencies -->
<!-- Provides the base Spark APIs. Required for base functionality -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.base.version}</artifactId>
<version>${spark.version}</version>
<!-- In most cases this dependency is supplied by Spark -->
<scope>provided</scope>
</dependency>
<!-- Provides the expanded APIs for Streaming with Kafka. Required in addition to spark-sql library -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.base.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- End Spark Dependencies -->
<!-- Popular scala configuration library -->
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.2</version>
</dependency>
<!-- To write to Splunk HTTP endpoint -->
</dependencies>
<build>
<!-- Tells scala-maven-plugin where to look -->
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- For building scala projects using maven -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<!-- Includes the compiled Scala .class files in some maven goals -->
<executions>
<execution>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- !!!!!!! BUILD FAT JAR !!!!!!! -->
<!-- Build a fat jar named example-1.0.0-jar-with-dependencies.jar -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Note** if you are submitting your job through spark instead of yarn, then uncomment the <scope>provided</scope>
line