我正在写在斯卡拉定制星火RDD实现,而我使用Spark壳调试我的实现。 我现在的目标是获得:
customRDD.count
无异常成功。 现在,这是我得到:
15/03/06 23:02:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/03/06 23:02:32 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)
...
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(ObjectStreamClass.java:2050)
at java.io.ObjectStreamClass.getObjFieldValues(ObjectStreamClass.java:1252)
... 45 more
“无法连载任务0”抓住了我的注意。 我没有什么事情我做的一个优秀的心理画面customRDD.count
,这是很清楚什么不能被序列化。
我的自定义RDD包括:
- 定制RDD类
- 定制的分区类
- 定制(斯卡拉)迭代器类
我的星火shell会话看起来是这样的:
import custom.rdd.stuff
import org.apache.spark.SparkContext
val conf = sc.getConf
conf.set(custom, parameters)
sc.stop
sc2 = new SparkContext(conf)
val mapOfThings: Map[String, String] = ...
myRdd = customRDD(sc2, mapOfStuff)
myRdd.count
... (exception output) ...
我想知道的是:
- 创建自定义RDD类的目的,有什么需要“序列化”?
- 这是什么意思是“序列化”,据星火是什么呢? 这是类似于Java的“序列化”?
- 不要从我RDD的迭代器返回的所有数据(由返回的
compute
方法),还需要序列化?
非常感谢你对任何澄清这个问题。