Spark streaming + json4s-jackson dependency proble

2020-04-12 02:00发布

问题:

I am unable to use json4s-Jackson 3.2.11 within my spark 1.4.1 Streaming application.

Thinking that it was the existing dependency within the spark-core project that is causing the problem as explained here -> Is it possible to use json4s 3.2.11 with Spark 1.3.0? I have built Spark from source with an adjusted core/pom.xml. I have changed the reference from json4s-jackson_2.10:3.2.10 to 3.2.11, as the 2.10 version does not support extracting to implicit types.

I have replaced the source jars that are referenced in my intellij IDEA project with the rebuilt jars, however I am still getting the same errors as before. I fear that Spark must still be referencing json4s 3.2.10 somehow?

here is my simple test:

object StreamingPredictor {

  implicit val formats = DefaultFormats

  case class event(Key: String,
                   sensorId: String,
                   sessionId: String,
                   deviceId: String,
                   playerId: String,
                   impressionId: String,
                   time: String,
                   eventName: String,
                   eventProperties: Map[String, Any],
                   dl: Array[List[(String, Any)]],
                   $post: Boolean,
                   $sync: Boolean)

  def parser(json: String): String = {
    val parsedJson = parse(json)
    val foo = parsedJson.extract[event]
    foo.eventName
  }

  def main(args: Array[String]) {

    val zkQuorum = "localhost:2181"
    val group = "myGroup"
    val topic = Map("test" -> 1)
    val sparkContext = new SparkContext("local[4]","KafkaConsumer")
    val ssc = new StreamingContext(sparkContext, Seconds(1))

    val json = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
    val eventName = json.map(_._2).map(parser)

    eventName.print()

    ssc.start()

  }
}

The error I get when referencing json4s 3.2.11 in my application pom.xml file:

java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.render(Lorg/json4s/JsonAST$JValue;)Lorg/json4s/JsonAST$JValue;
        at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
        at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:143)
        at org.apache.spark.scheduler.EventLoggingListener.onBlockManagerAdded(EventLoggingListener.scala:174)
        at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:46)
        at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
        at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
        at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56)
        at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)

And the error I get when i use json4s-jackson_2.10:3.2.10 in my application pom.xml file:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.json4s.package$MappingException: No usable value for eventProperties
No information known about type
        at org.json4s.reflect.package$.fail(package.scala:96)
        at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$buildCtorArg(Extraction.scala:443)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$14.apply(Extraction.scala:463)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$14.apply(Extraction.scala:463)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$instantiate(Extraction.scala:451)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:491)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:488)
        at org.json4s.Extraction$.org$json4s$Extraction$$customOrElse(Extraction.scala:500)
        at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:488)
        at org.json4s.Extraction$.extract(Extraction.scala:332)
        at org.json4s.Extraction$.extract(Extraction.scala:42)
        at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
        at com.pca.triggar.Streaming.StreamingPredictor$.parser(StreamingPredictor.scala:38)
        at com.pca.triggar.Streaming.StreamingPredictor$$anonfun$2.apply(StreamingPredictor.scala:57)
        at com.pca.triggar.Streaming.StreamingPredictor$$anonfun$2.apply(StreamingPredictor.scala:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1276)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1276)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.json4s.package$MappingException: No information known about type
        at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$instantiate(Extraction.scala:465)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:491)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:488)
        at org.json4s.Extraction$.org$json4s$Extraction$$customOrElse(Extraction.scala:500)
        at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:488)
        at org.json4s.Extraction$.extract(Extraction.scala:332)
        at org.json4s.Extraction$$anonfun$extract$5.apply(Extraction.scala:316)
        at org.json4s.Extraction$$anonfun$extract$5.apply(Extraction.scala:316)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.json4s.Extraction$.extract(Extraction.scala:316)
        at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$buildCtorArg(Extraction.scala:431)
        ... 42 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

回答1:

Ok, I found the problem. As posted else where, you need to compile against jason4s 3.2.10. Apparently, doing so generates a binary which would then work with Spark (version 1.5 in my case. Same in some earlier versions as well). It has to do with the default parameter in the render() method which shows up in 3.2.11.



回答2:

I had the same issue with emr 4.3.0 and spark 1.6 solved it with installing json4s in the bootstrap action by :

  1. down load the json4s jar and put it in s3
  2. create the following shell script and put it in s3

     #!/bin/bash
     set -e
     wget -S -T 10 -t 5 https://s3.amazonaws.com/your-bucketname/json4s-native_2.10-3.2.4.jar
     mkdir -p /home/hadoop/lib
     mv json4s-native_2.10-3.2.4.jar /home/hadoop/lib/   
    
  3. add it as a bootstrap step in the emr launch steps