I am facing a strange issue with Scala/Spark (1.5) and Zeppelin:
If I run the following Scala/Spark code, it will run properly:
// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")
rdd.map{a =>
val aa = testList(0)
None}
However after declaring a custom dataframe type as proposed here
//DATAFRAME EXTENSION
import org.apache.spark.sql.DataFrame
object ExtraDataFrameOperations {
implicit class DFWithExtraOperations(df : DataFrame) {
//drop several columns
def drop(colToDrop:Seq[String]):DataFrame = {
var df_temp = df
colToDrop.foreach{ case (f: String) =>
df_temp = df_temp.drop(f)//can be improved with Spark 2.0
}
df_temp
}
}
}
and using it for example like following:
//READ ALL THE FILES INTO different DF and save into map
import ExtraDataFrameOperations._
val filename = "myInput.csv"
val delimiter = ","
val colToIgnore = Seq("c_9", "c_10")
val inputICFfolder = "hdfs:///group/project/TestSpark/"
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "false") // Automatically infer data types? => no cause we need to merge all df, with potential null values => keep string only
.option("delimiter", delimiter)
.option("charset", "UTF-8")
.load(inputICFfolder + filename)
.drop(colToIgnore)//call the customize dataframe
This run successfully.
Now if I run again the following code (same as above)
// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")
rdd.map{a =>
val aa = testList(0)
None}
I get the error message:
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at :32 testList: List[String] = List(a, b) org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:314) ... Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ExtraDataFrameOperations$ Serialization stack: - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ExtraDataFrameOperations$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ExtraDataFrameOperations$@6c7e70e) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: ExtraDataFrameOperations$module, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ExtraDataFrameOperations$) - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@4c6d0802) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC) ...
I don't understand:
- Why this error occured while no operation on dataframe is performed?
- Why "ExtraDataFrameOperations" is not serializable while it was successfully used before??
UPDATE:
Trying with
@inline val testList = List[String]("a", "b")
does not help.
Just add 'extends Serializable' This work for me
It looks like spark tries to serialize all the scope around
testList
. Try to inline data@inline val testList = List[String]("a", "b")
or use different object where you store function/data which you pass to drivers.