Basically, i have to analyze some complex JSON's on HDFS with Spark.
I use "for comprehensions" to (pre)filter the JSON's and "extract" method of json4s to wrap it into a case class
This one works fine!
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized
}
So far so good!
When i try to extract the (pre)filtered JSON to my CaseClass i get this:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.json4s.DefaultFormats$
here the code with extraction:
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized.extract[View]
}
i have already tried my code on a scala ws, and its work! Im really new on things with hdfs and spark, so i would be appreciate a hint.