aparch spark, NotSerializableException: org.apache

2019-07-29 04:43发布

here is my code:

  val bg = imageBundleRDD.first()    //bg:[Text, BundleWritable]
  val res= imageBundleRDD.map(data => {
                                val desBundle = colorToGray(bg._2)        //lineA:NotSerializableException: org.apache.hadoop.io.Text
                                //val desBundle = colorToGray(data._2)    //lineB:everything is ok
                                (data._1, desBundle)
                             })
  println(res.count)

lineB goes well but lineA shows that:org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.io.Text

I try to use use Kryo to solve my problem but it seems nothing has been changed:

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
       kryo.register(classOf[Text])
       kryo.register(classOf[BundleWritable])
  }
}

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")
val sc = new SparkContext(...

Thanks!!!

3条回答
冷血范
2楼-- · 2019-07-29 05:07

The reason your code has the serialization problem is that your Kryo setup, while close, isn't quite right:

change:

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")
val sc = new SparkContext(...

to:

val sparkConf = new SparkConf()
  // ... set master, appname, etc, then:
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")

val sc = new SparkContext(sparkConf)
查看更多
冷血范
3楼-- · 2019-07-29 05:12

I had a similar problem when my Java code was reading sequence files containing Text keys. I found this post helpful:

http://apache-spark-user-list.1001560.n3.nabble.com/How-to-solve-java-io-NotSerializableException-org-apache-hadoop-io-Text-td2650.html

In my case, I converted Text to a String using map:

JavaPairRDD<String, VideoRecording> mapped = videos.map(new PairFunction<Tuple2<Text,VideoRecording>,String,VideoRecording>() {
    @Override
    public Tuple2<String, VideoRecording> call(
            Tuple2<Text, VideoRecording> kv) throws Exception {
        // Necessary to copy value as Hadoop chooses to reuse objects
        VideoRecording vr = new VideoRecording(kv._2);
        return new Tuple2(kv._1.toString(), vr);
    }
});

Be aware of this note in the API for sequenceFile method in JavaSparkContext:

Note: Because Hadoop's RecordReader class re-uses the same Writable object for each record, directly caching the returned RDD will create many references to the same object. If you plan to directly cache Hadoop writable objects, you should first copy them using a map function.

查看更多
Root(大扎)
4楼-- · 2019-07-29 05:17

In Apache Spark while dealing with Sequence files, we have to follow these techniques:

 -- Use Java equivalent Data Types in place of Hadoop data types.
 -- Spark Automatically converts the Writables into Java equivalent Types.

Ex:- We have a sequence file "xyz", here key type is say Text and value
is LongWritable. When we use this file to create an RDD, we need use  their 
java  equivalent data types i.e., String and Long respectively.

 val mydata = = sc.sequenceFile[String, Long]("path/to/xyz")
 mydata.collect

查看更多
登录 后发表回答