Scala fat jar dependency issue while Job submit

2020-04-16 05:04发布

问题:

I have written simple kafka stream using Scala. It is working good in local. I have taken fat jar and submitted in scala cluster. I am getting class not found error after submit the job. if I extract the fat jar, it has all dependency inside the fat jar.

why I am getting class not found error ?. How to solve this ?

Note: if I deploy(copy) the fat jar into Spark/jars folder manually. I don't see any issue. But, it is not correct approach

I am using window 7 & running master and worker node on the same machine.

JOB Submit

spark-2.2\bin>spark-submit --class Welcome --master spark://169.254.208.125:7077 C:\Gnana\cass-conn-assembly-0.1.jar

Code

import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkConf
import com.datastax.spark.connector.streaming._


object Welcome {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("demo").setMaster("spark://169.254.208.125:7077");
    conf.set("spark.cassandra.connection.host", "192.168.1.2")
    val sc = new SparkContext(conf)

    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Milliseconds(100))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.1.2:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array("test")
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    val lines = messages.map(_.value)
    lines.map(li=>{val arr= li.split(",");(arr(0).toInt,arr(1),arr(2),arr(3))}).saveToCassandra("inventory", "emp",SomeColumns("emp_id","create_date","emp_city","emp_name"))
    println(" Spark is ready !!!!!! ");

    /*sys.ShutdownHookThread {
      println("Gracefully stopping Spark Streaming Application")
      ssc.stop(stopSparkContext = true, stopGracefully = true)
      println("Application stopped")
    }*/

    ssc.start();
    ssc.awaitTermination();
  }
  def sayHello(msg:String): Unit = {
    print("welcome to Sacala "+msg);
  }
}

build.sbt

organization := "com.demo"
name := "cass-conn"
version := "0.1"
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion  % "provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion  % "provided",
  "org.apache.spark" %% "spark-hive" % sparkVersion  % "provided",
  "com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion  ,
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.2.0",
  "org.apache.spark" %% "spark-streaming" %  "2.2.0"  % "provided",
)

mergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x => (mergeStrategy in assembly).value(x)
}

Error 1:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/Consumer
        at org.apache.spark.streaming.kafka010.ConsumerStrategies$.Subscribe(ConsumerStrategy.scala:256)
        at Welcome$.main(Welcome.scala:32)
        at Welcome.main(Welcome.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.sca
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.Consumer
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 12 more

Build Log

[info] Done updating.
[info] Compiling 2 Scala sources to C:\Gnana\cass-conn\target\scala-2.11\classes ...
[info] Done compiling.
[info] Including: slf4j-api-1.7.21.jar
[info] Including: joda-time-2.3.jar
[info] Including: kafka_2.11-0.10.1.0.jar
[info] Including: scala-library-2.11.8.jar
[info] Including: jopt-simple-4.9.jar
[info] Including: metrics-core-2.2.0.jar
[info] Including: slf4j-log4j12-1.7.21.jar
[info] Including: log4j-1.2.17.jar
[info] Including: joda-convert-1.2.jar
[info] Including: zkclient-0.9.jar
[info] Including: scala-reflect-2.11.8.jar
[info] Including: zookeeper-3.4.8.jar
[info] Including: jline-0.9.94.jar
[info] Including: netty-3.7.0.Final.jar
[info] Including: lz4-1.3.0.jar
[info] Including: scala-parser-combinators_2.11-1.0.4.jar
[info] Including: snappy-java-1.1.2.6.jar
[info] Including: kafka-clients-0.10.1.0.jar
[info] Including: spark-streaming-kafka-0-10_2.11-2.2.0.jar
[info] Including: spark-tags_2.11-2.2.0.jar
[info] Including: unused-1.0.0.jar
[info] Including: netty-all-4.0.33.Final.jar
[info] Including: commons-beanutils-1.9.3.jar
[info] Including: jsr166e-1.1.0.jar
[info] Including: spark-cassandra-connector_2.11-2.0.7.jar
[info] Including: commons-collections-3.2.2.jar
[info] Checking every *.class/*.jar file's SHA-1.
[info] Merging files...
[warn] Merging 'NOTICE' with strategy 'rename'
[warn] Merging 'META-INF\NOTICE.txt' with strategy 'rename'
[warn] Merging 'META-INF\NOTICE' with strategy 'rename'
[warn] Merging 'org\xerial\snappy\native\README' with strategy 'rename'
[warn] Merging 'META-INF\LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF\license' with strategy 'rename'
[warn] Merging 'LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF\LICENSE' with strategy 'rename'
[warn] Merging 'LICENSE' with strategy 'rename'
[warn] Merging 'META-INF\DEPENDENCIES' with strategy 'discard'
[warn] Merging 'META-INF\INDEX.LIST' with strategy 'discard'
[warn] Merging 'META-INF\MANIFEST.MF' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.datastax.cassandra\cassandra-driver-core\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.datastax.cassandra\cassandra-driver-core\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.datastax.cassandra\cassandra-driver-mapping\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.datastax.cassandra\cassandra-driver-mapping\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jffi\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jffi\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-constants\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-constants\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-ffi\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-ffi\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-posix\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-posix\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-x86asm\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-x86asm\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.google.guava\guava\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.google.guava\guava\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.twitter\jsr166e\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.twitter\jsr166e\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.yammer.metrics\metrics-core\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.yammer.metrics\metrics-core\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\commons-beanutils\commons-beanutils\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\commons-beanutils\commons-beanutils\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\commons-collections\commons-collections\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\commons-collections\commons-collections\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\io.netty\netty-all\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\io.netty\netty-all\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\io.netty\netty\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\io.netty\netty\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\jline\jline\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\jline\jline\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\joda-time\joda-time\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\joda-time\joda-time\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\log4j\log4j\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\log4j\log4j\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\net.sf.jopt-simple\jopt-simple\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\net.sf.jopt-simple\jopt-simple\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.apache.spark\spark-streaming-kafka-0-10_2.11\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.apache.spark\spark-streaming-kafka-0-10_2.11\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.apache.spark\spark-tags_2.11\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.apache.spark\spark-tags_2.11\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.joda\joda-convert\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.joda\joda-convert\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.slf4j\slf4j-api\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.slf4j\slf4j-api\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.slf4j\slf4j-log4j12\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.slf4j\slf4j-log4j12\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.spark-project.spark\unused\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.spark-project.spark\unused\pom.xml' with strategy 'discard'
[warn] Merging 'org\apache\spark\unused\UnusedStubClass.class' with strategy 'first'
[warn] Strategy 'discard' was applied to 51 files
[warn] Strategy 'first' was applied to a file
[warn] Strategy 'rename' was applied to 9 files
[info] SHA-1: 85f8513511b46290883ab70f2525b04a8d3c33d7

回答1:

You are missing the kafka dependency:

"org.apache.kafka" %% "kafka" % "0.10.1.0"

Add that an you will be all good.

I have my build.sbt setup like this:

"org.apache.kafka" %% "kafka"                      % "0.10.1.0",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.2.0",
"org.apache.spark" %% "spark-streaming"            % "2.2.0" % "provided"


回答2:

Try changing your spark-streaming-kafka dependency to

"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0"

build a fresh fat jar and see if this solves the problem.

final build.sbt looks like

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion  % "provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion  % "provided",
  "org.apache.spark" %% "spark-hive" % sparkVersion  % "provided",
  "com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion  ,
  "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0" % "provided",
  "org.apache.spark" %% "spark-streaming" %  "2.2.0"
)