JavaSparkContext not serializable

2020-07-10 07:20发布

问题:

I'm using spark with cassandra, and i hava a JavaRDD<String> of clients. And for each client, i want to select from cassandra his Interactions like this :

avaPairRDD<String, List<InteractionByMonthAndCustomer>> a = client.mapToPair(new PairFunction<String, String, List<InteractionByMonthAndCustomer>>() {
        @Override
        public Tuple2<String, List<InteractionByMonthAndCustomer>> call(String s) throws Exception {               
            List<InteractionByMonthAndCustomer> b = javaFunctions(sc)
                    .cassandraTable(CASSANDRA_SCHEMA, "interaction_by_month_customer")
                    .where("ctid =?", s)
                    .map(new Function<CassandraRow, InteractionByMonthAndCustomer>() {
                        @Override
                        public InteractionByMonthAndCustomer call(CassandraRow cassandraRow) throws Exception {
                            return new InteractionByMonthAndCustomer(cassandraRow.getString("channel"),
                                    cassandraRow.getString("motif"),
                                    cassandraRow.getDate("start"),
                                    cassandraRow.getDate("end"),
                                    cassandraRow.getString("ctid"),
                                    cassandraRow.getString("month")
                            );
                        }
                    }).collect();
            return new Tuple2<String, List<InteractionByMonthAndCustomer>>(s, b);
        }
    });

For this i'm using one JavaSparkContext sc. But i got this error :

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:99)
at org.apache.spark.api.java.JavaRDD.mapToPair(JavaRDD.scala:32)
at fr.aid.cim.spark.dao.GenrateCustumorJourney.AllCleintInteractions(GenrateCustumorJourney.java:91)
at fr.aid.cim.spark.dao.GenrateCustumorJourney.main(GenrateCustumorJourney.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 14 more

I think that the JavaSparkContext must be serializable. But how can i make it serializable please ?

Thank you.

回答1:

No, JavaSparkContext is not serializable and is not supposed to be. It can't be used in a function you send to remote workers. Here you're not explicitly referencing it but a reference is being serialized anyway because your anonymous inner class function is not static and therefore has a reference to the enclosing class.

Try rewriting your code with this function as a static, stand-alone object.



回答2:

You cannot use SparkContext and create other RDDs from within an executor (map function of an RDD).

You have to create the Cassandra RDD (sc.cassandraTable) in the driver and then do a join between those two RDDs (client RDD and cassandra table RDD).



回答3:

Declare it with transient keyword:

private transient JavaSparkContext sparkContext;