I'm writing a custom Spark RDD implementation in Scala, and I'm debugging my implementation using the Spark shell. My goal for now is to get:
customRDD.count
to succeed without an Exception. Right now this is what I'm getting:
15/03/06 23:02:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/03/06 23:02:32 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)
...
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(ObjectStreamClass.java:2050)
at java.io.ObjectStreamClass.getObjFieldValues(ObjectStreamClass.java:1252)
... 45 more
The "failed to serialize task 0" catches my attention. I don't have an outstanding mental picture of what's going on I do customRDD.count
, and it's very unclear exactly what could not be serialized.
My custom RDD consists of:
- custom RDD class
- custom Partition class
- custom (scala) Iterator class
My Spark shell session looks like this:
import custom.rdd.stuff
import org.apache.spark.SparkContext
val conf = sc.getConf
conf.set(custom, parameters)
sc.stop
sc2 = new SparkContext(conf)
val mapOfThings: Map[String, String] = ...
myRdd = customRDD(sc2, mapOfStuff)
myRdd.count
... (exception output) ...
What I'd like to know is:
- For the purposes of creating a custom RDD class, what needs to be "serializable"?
- What does it mean to be "serializable", as far as Spark is concerned? Is this akin to Java's "Serializable"?
- Do all data returned from my RDD's Iterator (returned by the
compute
method) also need to be serializable?
Thank you so much for any clarification on this issue.
In addition to Kenny's explanation, I would suggest you turn on serialization debugging to see what's causing the problem. Often it's humanly impossible to figure out just by looking at the code.
Code executed on a Spark context is required to exist within the same process boundary of the worker node in which a task is instructed to execute on. This means that care must be taken to ensure that any objects or values referenced in your RDD customizations are serializable. If the objects are non-serializable, then you need to make sure that they are properly scoped so that each partition has a new instance of that object.
Basically, you can't share a non-serializable instance of an object declared on your Spark driver and expect its state to be replicated to other nodes on your cluster.
This is an example that will fail to serialize the non-serializable object:
The example below will work fine, because it is in the context of a lambda, it can be properly distributed to multiple partitions without needing to serialize the state of the instance of the non-serializable object. This also goes for non-serializable transitive dependencies referenced as a part of your RDD customization (if any).
See here for more details: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
The problem is that you are passing SparkContex(Boiler plate) in your customRdd method(customRDD(sc2, mapOfStuff)). Make sure your class also Serialize which making the SparkContext.