I have a spring boot microservice talking to a remote spark cluster with 3 nodes, and executing the following logic:
Dataset<Row> df = sparkSession.read().json("/opt/enso/test.json");
StructType schema = df.schema();
JavaPairRDD<Row, Long> zippedRows = df.toJavaRDD().zipWithIndex();
JavaPairRDD<Row, Long> filteredRows = zippedRows.filter(new Function<Tuple2<Row,Long>,Boolean> () {
@Override
public Boolean call(Tuple2<Row,Long> v1) throws Exception {
return v1._2 >= 1 && v1._2 <= 5;
}
});
JavaRDD<Row> filteredRdd = filteredRows.keys();
Dataset<Row> ds = sparkSqlContext.createDataFrame(filteredRdd, schema);
List<String> mystr = ds.toJSON().collectAsList();
It is failing on JavaPairRDD filteredRows = zippedRows.filter
with the anonymous Function class from org.apache.spark.api.java.function.Function
with the above exception.
I've looked at the following JIRAs: https://issues.apache.org/jira/browse/SPARK-9219
https://issues.apache.org/jira/browse/SPARK-18075
and everything in https://issues.apache.org/jira/browse/SPARK-19938?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22scala.collection.immutable.List%24SerializationProxy%22
It seems like it's a class loading issue, and I've tried
error when use filter(),map(),... in spark java api( org.apache.spark.SparkException )
doing the proposed solution in the question above.
I've also tried loading my jar through setting
spark.executor.extraClassPath and still have the same issue.
Any pointers?