How to fix java.lang.ClassCastException: cannot as

2020-02-01 19:28发布

This error has been the hardest to trace. I am not sure what is going on. I am running a Spark cluster on my location machine. so the entire spark cluster is under one host which is 127.0.0.1 and I run on a standalone mode

JavaPairRDD<byte[], Iterable<CassandraRow>> cassandraRowsRDD= javaFunctions(sc).cassandraTable("test", "hello" )
   .select("rowkey", "col1", "col2", "col3",  )
   .spanBy(new Function<CassandraRow, byte[]>() {
        @Override
        public byte[] call(CassandraRow v1) {
            return v1.getBytes("rowkey").array();
        }
    }, byte[].class);

Iterable<Tuple2<byte[], Iterable<CassandraRow>>> listOftuples = cassandraRowsRDD.collect(); //ERROR HAPPENS HERE
Tuple2<byte[], Iterable<CassandraRow>> tuple = listOftuples.iterator().next();
byte[] partitionKey = tuple._1();
for(CassandraRow cassandraRow: tuple._2()) {
    System.out.println("************START************");
    System.out.println(new String(partitionKey));
    System.out.println("************END************");
}

This error has been the hardest to trace. It clearly happens at cassandraRowsRDD.collect() and I dont know why?

16/10/09 23:36:21 ERROR Executor: Exception in task 2.3 in stage 0.0 (TID 21)
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Here are the versions I use

Scala code runner version 2.11.8  // when I run scala -version or even ./spark-shell


compile group: 'org.apache.spark' name: 'spark-core_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-streaming_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-sql_2.11' version: '2.0.0'
compile group: 'com.datastax.spark' name: 'spark-cassandra-connector_2.11' version: '2.0.0-M3': 

my gradle file looks like this after introducing something called "provided" which actually doesn't seem to exist but google said to create one so my build.gradle looks like this

group 'com.company'
version '1.0-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'idea'

repositories {
    mavenCentral()
    mavenLocal()
}

configurations {
    provided
}
sourceSets {
    main {
        compileClasspath += configurations.provided
        test.compileClasspath += configurations.provided
        test.runtimeClasspath += configurations.provided
    }
}

idea {
    module {
        scopes.PROVIDED.plus += [ configurations.provided ]
    }
}

dependencies {
    compile 'org.slf4j:slf4j-log4j12:1.7.12'
    provided group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.0'
    provided group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.0.0'
    provided group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.0.0'
    provided group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.0.0-M3'
}



jar {
    from { configurations.provided.collect { it.isDirectory() ? it : zipTree(it) } }
   // with jar
    from sourceSets.test.output
    manifest {
        attributes 'Main-Class': "com.company.batchprocessing.Hello"
    }
    exclude 'META-INF/.RSA', 'META-INF/.SF', 'META-INF/*.DSA'
    zip64 true
}

6条回答
三岁会撩人
2楼-- · 2020-02-01 19:51

You call() method should return byte[] like below.

@Override
public byte[] call(CassandraRow v1) {
  return v1.getBytes("rowkey").array();
}

If you still get the issue then check the versions of your dependencies as mentioned in Jira https://issues.apache.org/jira/browse/SPARK-9219

查看更多
何必那么认真
3楼-- · 2020-02-01 19:54

I had the same issue and could resolve it by adding my application's jar to spark's classpath with

spark = SparkSession.builder()
        .appName("Foo")
        .config("spark.jars", "target/scala-2.11/foo_2.11-0.1.jar")
查看更多
ら.Afraid
4楼-- · 2020-02-01 20:02

Check you code - In Intellij: Analyze... -> Inspect code. If you have deprecated methods related to serialisation fix it. Or simply try to reduce Spark o Scala version. In my case I reduce Scala version to 2.10 and all worked.

查看更多
冷血范
5楼-- · 2020-02-01 20:14

I have hit the same exception and have dig into multiple related Jiras (9219, 12675, 18075).

I believe that the exception name is confusing, and the real problem is the inconsistent environment settings between the spark cluster and the driver application.

For example, I started my Spark cluster with the following line in conf/spark-defaults.conf:

spark.master                     spark://master:7077

while I started my driver program (even the program is started with spark-submit) with a line:

sparkSession.master("spark://<master ip>:7077")

in which the <master ip> is the correct IP address of the node master, but the program would fail due to this simple inconsistency.

As a result, I would recommend that all driver applications are started with spark-submit and do not duplicate any configuration in the driver code (unless you need to override some config). Namely, just let the spark-submit set your environment with the same way in the running Spark cluster.

查看更多
看我几分像从前
6楼-- · 2020-02-01 20:15

try don't use .master("spark://hadoop001:7077") and use .master("local[2]") solved my problem

查看更多
Summer. ? 凉城
7楼-- · 2020-02-01 20:17

In my case I had to add spark-avro jar (I put it into /lib folder next to main jar):

SparkSession spark = SparkSession.builder().appName("myapp").getOrCreate();
...
spark.sparkContext().addJar("lib/spark-avro_2.11-4.0.0.jar");
查看更多
登录 后发表回答