Serialization and Custom Spark RDD Class

2019-08-14 05:55发布

I'm writing a custom Spark RDD implementation in Scala, and I'm debugging my implementation using the Spark shell. My goal for now is to get:

customRDD.count

to succeed without an Exception. Right now this is what I'm getting:

15/03/06 23:02:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/03/06 23:02:32 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)

...

Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(ObjectStreamClass.java:2050)
    at java.io.ObjectStreamClass.getObjFieldValues(ObjectStreamClass.java:1252)
    ... 45 more

The "failed to serialize task 0" catches my attention. I don't have an outstanding mental picture of what's going on I do customRDD.count, and it's very unclear exactly what could not be serialized.

My custom RDD consists of:

  • custom RDD class
  • custom Partition class
  • custom (scala) Iterator class

My Spark shell session looks like this:

import custom.rdd.stuff
import org.apache.spark.SparkContext

val conf = sc.getConf
conf.set(custom, parameters)
sc.stop
sc2 = new SparkContext(conf)
val mapOfThings: Map[String, String] = ...
myRdd = customRDD(sc2, mapOfStuff)
myRdd.count

... (exception output) ...

What I'd like to know is:

  • For the purposes of creating a custom RDD class, what needs to be "serializable"?
  • What does it mean to be "serializable", as far as Spark is concerned? Is this akin to Java's "Serializable"?
  • Do all data returned from my RDD's Iterator (returned by the compute method) also need to be serializable?

Thank you so much for any clarification on this issue.

3条回答
我命由我不由天
2楼-- · 2019-08-14 06:32

In addition to Kenny's explanation, I would suggest you turn on serialization debugging to see what's causing the problem. Often it's humanly impossible to figure out just by looking at the code.

-Dsun.io.serialization.extendedDebugInfo=true
查看更多
forever°为你锁心
3楼-- · 2019-08-14 06:50

Code executed on a Spark context is required to exist within the same process boundary of the worker node in which a task is instructed to execute on. This means that care must be taken to ensure that any objects or values referenced in your RDD customizations are serializable. If the objects are non-serializable, then you need to make sure that they are properly scoped so that each partition has a new instance of that object.

Basically, you can't share a non-serializable instance of an object declared on your Spark driver and expect its state to be replicated to other nodes on your cluster.

This is an example that will fail to serialize the non-serializable object:

NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");

rdd.map(s -> notSerializable.doSomething(s)).collect();

The example below will work fine, because it is in the context of a lambda, it can be properly distributed to multiple partitions without needing to serialize the state of the instance of the non-serializable object. This also goes for non-serializable transitive dependencies referenced as a part of your RDD customization (if any).

rdd.forEachPartition(iter -> {
  NotSerializable notSerializable = new NotSerializable();

  // ...Now process iter
});

See here for more details: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html

查看更多
祖国的老花朵
4楼-- · 2019-08-14 06:54

The problem is that you are passing SparkContex(Boiler plate) in your customRdd method(customRDD(sc2, mapOfStuff)). Make sure your class also Serialize which making the SparkContext.

查看更多
登录 后发表回答