ClassCastException: cannot assign instance of scal

2019-08-27 02:31发布

问题:

I have a spring boot microservice talking to a remote spark cluster with 3 nodes, and executing the following logic:

    Dataset<Row> df = sparkSession.read().json("/opt/enso/test.json");
    StructType schema = df.schema();

    JavaPairRDD<Row, Long> zippedRows = df.toJavaRDD().zipWithIndex();
    JavaPairRDD<Row, Long> filteredRows = zippedRows.filter(new Function<Tuple2<Row,Long>,Boolean> () {
           @Override
           public Boolean call(Tuple2<Row,Long> v1) throws Exception {
           return v1._2 >= 1 && v1._2 <= 5;
           }
        });
    JavaRDD<Row> filteredRdd =  filteredRows.keys();

    Dataset<Row> ds = sparkSqlContext.createDataFrame(filteredRdd, schema);
    List<String> mystr = ds.toJSON().collectAsList();

It is failing on JavaPairRDD filteredRows = zippedRows.filter

with the anonymous Function class from org.apache.spark.api.java.function.Function

with the above exception.

I've looked at the following JIRAs: https://issues.apache.org/jira/browse/SPARK-9219

https://issues.apache.org/jira/browse/SPARK-18075

and everything in https://issues.apache.org/jira/browse/SPARK-19938?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22scala.collection.immutable.List%24SerializationProxy%22

It seems like it's a class loading issue, and I've tried

error when use filter(),map(),... in spark java api( org.apache.spark.SparkException )

doing the proposed solution in the question above.

I've also tried loading my jar through setting

spark.executor.extraClassPath and still have the same issue.

Any pointers?