序列化和定制星火RDD类(Serialization and Custom Spark RDD Cl

2019-10-22 04:19发布

我正在写在斯卡拉定制星火RDD实现,而我使用Spark壳调试我的实现。 我现在的目标是获得:

customRDD.count

无异常成功。 现在,这是我得到:

15/03/06 23:02:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/03/06 23:02:32 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)

...

Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(ObjectStreamClass.java:2050)
    at java.io.ObjectStreamClass.getObjFieldValues(ObjectStreamClass.java:1252)
    ... 45 more

“无法连载任务0”抓住了我的注意。 我没有什么事情我做的一个优秀的心理画面customRDD.count ,这是很清楚什么不能被序列化。

我的自定义RDD包括:

  • 定制RDD类
  • 定制的分区类
  • 定制(斯卡拉)迭代器类

我的星火shell会话看起来是这样的:

import custom.rdd.stuff
import org.apache.spark.SparkContext

val conf = sc.getConf
conf.set(custom, parameters)
sc.stop
sc2 = new SparkContext(conf)
val mapOfThings: Map[String, String] = ...
myRdd = customRDD(sc2, mapOfStuff)
myRdd.count

... (exception output) ...

我想知道的是:

  • 创建自定义RDD类的目的,有什么需要“序列化”?
  • 这是什么意思是“序列化”,据星火是什么呢? 这是类似于Java的“序列化”?
  • 不要从我RDD的迭代器返回的所有数据(由返回的compute方法),还需要序列化?

非常感谢你对任何澄清这个问题。

Answer 1:

在星火上下文中执行代码需要在任务被指示执行的工作节点的同一过程边界内生存。 这意味着,必须小心,以确保您的自定义RDD引用的任何对象或值是可序列化。 如果对象是不可序列化,那么你需要确保他们得到适当的作用域,使每个分区都有该对象的新实例。

基本上,你可以不同意你的星火程序声明对象的非序列化的实例,并预计其状态被复制到群集上的其他节点。

这是将无法序列化非序列化的对象的示例:

NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");

rdd.map(s -> notSerializable.doSomething(s)).collect();

下面的例子将正常工作,因为它是在一个拉姆达的情况下,可以适当地分配给多个分区,而无需序列化不可序列化对象的实例的状态。 这还要对作为RDD定制的部分(如有)引用的非序列化的传递依赖。

rdd.forEachPartition(iter -> {
  NotSerializable notSerializable = new NotSerializable();

  // ...Now process iter
});

在这里看到更多的细节: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html



Answer 2:

除了肯尼的解释,我建议你打开序列化调试,看看有什么是造成问题。 通常它是从人的角度不可能仅仅通过看代码搞清楚。

-Dsun.io.serialization.extendedDebugInfo=true


Answer 3:

问题是,你逝去的SparkContex(锅炉板)在customRdd方法(customRDD(SC2,mapOfStuff))。 确保您的类也序列化这使得SparkContext。



文章来源: Serialization and Custom Spark RDD Class