NotSerializableException with json4s on Spark

2020-04-02 10:03发布

问题:

Basically, i have to analyze some complex JSON's on HDFS with Spark.

I use "for comprehensions" to (pre)filter the JSON's and "extract" method of json4s to wrap it into a case class

This one works fine!

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized
}

So far so good!

When i try to extract the (pre)filtered JSON to my CaseClass i get this:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.json4s.DefaultFormats$

here the code with extraction:

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized.extract[View]
}

i have already tried my code on a scala ws, and its work! Im really new on things with hdfs and spark, so i would be appreciate a hint.

回答1:

Spark serializes the closures on the RDD transformations and 'ships' those to the workers for distributed execution. That mandates that all code within the closure (and often also in the containing object) should be serializable.

Looking that the impl of org.json4s.DefaultFormat$ (the companion object of that trait):

object DefaultFormats extends DefaultFormats {
    val losslessDate = new ThreadLocal(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
    val UTC = TimeZone.getTimeZone("UTC")

}

It's clear that this object is not serializable and cannot be made so. (ThreadLocal is by its own nature non-serializable)

You don't seem to be using Date types on your code, so could you get rid of implicit val formats = DefaultFormats or replace DefaultFormats by something serializable?



回答2:

This has actually now been fixed; JSON4S is serializable as of version 3.3.0: https://github.com/json4s/json4s/issues/137



回答3:

What solved my issue was, I used implicit val formats = DefaultFormats in rdd.foreach{} loop. It resolved my serializable Exception.

Here's my code snippet which solved the issue:

case class rfId(rfId: String) {}

// ... some code here ...

 rdd.foreach { record =>
    val value = record.value()

    // Bring in default date formats etc and makes json4s serializable
    implicit val formats = DefaultFormats
    val json = parse(value)
    println(json.camelizeKeys.extract[rfId])  // Prints `rfId(ABC12345678)`
 }