Basically, i have to analyze some complex JSON's on HDFS with Spark.
I use "for comprehensions" to (pre)filter the JSON's and "extract" method of json4s to wrap it into a case class
This one works fine!
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized
}
So far so good!
When i try to extract the (pre)filtered JSON to my CaseClass i get this:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.json4s.DefaultFormats$
here the code with extraction:
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized.extract[View]
}
i have already tried my code on a scala ws, and its work! Im really new on things with hdfs and spark, so i would be appreciate a hint.
Spark serializes the closures on the RDD transformations and 'ships' those to the workers for distributed execution. That mandates that all code within the closure (and often also in the containing object) should be serializable.
Looking that the impl of org.json4s.DefaultFormat$ (the companion object of that trait):
It's clear that this object is not serializable and cannot be made so. (ThreadLocal is by its own nature non-serializable)
You don't seem to be using
Date
types on your code, so could you get rid ofimplicit val formats = DefaultFormats
or replace DefaultFormats by something serializable?This has actually now been fixed; JSON4S is serializable as of version 3.3.0: https://github.com/json4s/json4s/issues/137
What solved my issue was, I used
implicit val formats = DefaultFormats
inrdd.foreach{}
loop. It resolved my serializable Exception.Here's my code snippet which solved the issue: