I am trying to define a udf in spark(2.0) from a string containing scala function definition.Here is the snippet:
val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
import universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
val toolbox = currentMirror.mkToolBox()
val f = udf(toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int])
sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show
This gives me an error :
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
However when I define the udf as :
val f = udf((s:String) => 5)
it works just fine. What is the issue here?The end objective is to take a string which has the defn of a scala function and use it as a udf.
I had the same error, and it doesn't show the ClassNotFoundException because the JavaDeserializationStream class is catching the exception, depending on your environment it is failing because it coudn't find the class you're trying to execute from your RDD/DataSet but it doesn't show the ClassNotFoundError . To fix this issue I had to generate a jar with all the classes on my project (including the function and dependencies ) and include the jar inside the spark environment
This for an standalone cluster
and this for a yarn cluster
As Giovanny observed, the problem lies in the class loaders being different (you can investigate this more by calling
.getClass.getClassLoader
on whatever object). Then, when the workers try to deserialize your reflected function, all hell breaks loose.Here is a solution that does not involve any class loader hackery. The idea is to move the reflection step to the workers. We'll end up having to redo the reflection step, but only once per worker. I think this is pretty optimal - even if you did the reflection only once on the master node, you would have to do a fair bit of work per worker to get them to recognize the function.
Then, calling
sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show
works just fine.Feel free to comment out the
println
- it is just an easy way of counting how many times the reflection happened. Inspark-shell --master 'local'
that's only once, but inspark-shell --master 'local[2]'
it's twice.How it works
The UDF gets evaluated immediately, but it never gets used until it reaches the worker nodes, so the lazy values
toolbox
andfunc
only get evaluated on the workers. Furthermore, since they are lazy, they only ever get evaluated once per worker.