I'm trying to build a web api for my apache spark jobs using sparkjava.com framework. My code is:
@Override
public void init() {
get("/hello",
(req, res) -> {
String sourcePath = "hdfs://spark:54310/input/*";
SparkConf conf = new SparkConf().setAppName("LineCount");
conf.setJars(new String[] { "/home/sam/resin-4.0.42/webapps/test.war" });
File configFile = new File("config.properties");
String sparkURI = "spark://hamrah:7077";
conf.setMaster(sparkURI);
conf.set("spark.driver.allowMultipleContexts", "true");
JavaSparkContext sc = new JavaSparkContext(conf);
@SuppressWarnings("resource")
JavaRDD<String> log = sc.textFile(sourcePath);
JavaRDD<String> lines = log.filter(x -> {
return true;
});
return lines.count();
});
}
If I remove the lambda expression or put it inside a simple jar rather than a web service (somehow a servlet) it will run without any error. But using a lambda expression inside a servlet will result this exception:
15/01/28 10:36:33 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hamrah): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1999)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
P.S: I tried combination of jersey and javaspark with jetty, tomcat and resin and all of them led me to the same result.
I had the same error and I replaced the lambda with an inner class, then it worked. I don't really understand why, and reproducing this error was extremely difficult (we had one server which exhibited the behavior, and nowhere else).
Causes serialization problems (uses lambdas, causes
SerializedLambda
error)Yields
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field MyObject.val$variable
Works
What you have here, is a follow-up error which masks the original error.
When lambda instances are serialized, they use
writeReplace
to dissolve their JRE specific implementation from the persistent form which is aSerializedLambda
instance. When theSerializedLambda
instance has been restored, itsreadResolve
method will be invoked to reconstitute the appropriate lambda instance. As the documentation says, it will do so by invoking a special method of the class which defined the original lambda (see also this answer). The important point is that the original class is needed and that’s what’s missing in your case.But there’s a …special… behavior of the
ObjectInputStream
. When it encounters an exception, it doesn’t bail out immediately. It will record the exception and continue the process, marking all object being currently read, thus depending on the erroneous object as being erroneous as well. Only at the end of the process it will throw the original exception it encountered. What makes it so strange is that it will also continue trying to set the fields of these object. But when you look at the methodObjectInputStream.readOrdinaryObject
line 1806:you see that it doesn’t call the
readResolve
method whenlookupException
reports a non-null
exception. But when the substitution did not happen, it’s not a good idea to continue trying to set the field values of the referrer but that’s exactly what’s happens here, hence producing aClassCastException
.You can easily reproduce the problem:
Compile these four classes and run
Writing
. Then delete the class fileDefining.class
and runReading
. Then you will get a(Tested with 1.8.0_20)
The bottom line is that you may forget about this Serialization issue once it is understood what’s happening, all you have to do for solving your problem is to make sure that the class which defined the lambda expression is also available in the runtime where the lambda is deserialized.
Example for Spark Job to run directly from IDE (spark-submit distributes jar by default):
You can maybe more simply remplace your Java8 lambda with a
spark.scala.Function
replace
with:
I suppose your problem is failed auto-boxing. In the code
you pass (
String->boolean
) lambda (it isPredicate<String>
) while filter method takes (String->Boolean
) lambda (it isFunction<String,Boolean>
). So I offer you to change code toInclude details into your question please. Output from
uname -a
andjava -version
is appreciated. Provide sscce if possible.