I am using Apache Zeppelin notebook. So spark is basically running in interactive mode. I can't use closure variable here since zeppelin throws org.apache.spark.SparkException: Task not serializable
as it tries to serialize whole paragraph (bigger closure).
So without closure approach only option I have is to pass map as a column to UDF.
I have a following map collected from paried RDD:
final val idxMap = idxMapRdd.collectAsMap
Which is being used in one of spark transformation here:
def labelStr(predictions: WrappedArray[Double], idxMap: Map[Double, String]): Array[String] = {
predictions.array.map(idxMap.getOrElse(_, "Other"))
}
@transient val predictionStrUDF = udf { (predictions: WrappedArray[Double], idxMap: Map[Double, String]) => labelStr(predictions)}
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF(col("predictions"), lit(idxMap)))
But with lit(idxMap)
statement I got following error:
java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.HashMap$HashTrieMap
So I tried creating column from using following:
val colmap = map(idxMapArr.map(lit _): _*)
But getting following error:
<console>:139: error: type mismatch;
found : Iterable[org.apache.spark.sql.Column]
required: Seq[org.apache.spark.sql.Column]
val colmap = map(idxMapArr.map(lit _): _*)
Closure approach (for completeness):
def predictionStrUDF2( idxMapArr: scala.collection.Map[Double,String]) = {
udf((predictions: WrappedArray[Double] ) => labelStr(predictions, idxMapArr))
}
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF2(idxMapArr)(col("predictions")))
which compile but then when I do cvmlPredictionsStr.show
I get following. I think this is due to interactive nature of zeppelin
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
... 62 elided
Caused by: java.io.NotSerializableException: com.github.fommil.netlib.F2jBLAS
Serialization stack:
- object not serializable (class: com.github.fommil.netlib.F2jBLAS, value: com.github.fommil.netlib.F2jBLAS@294770d3)
- field (class: org.apache.spark.ml.tuning.CrossValidator, name: f2jBLAS, type: class com.github.fommil.netlib.F2jBLAS)
- object (class org.apache.spark.ml.tuning.CrossValidator, cv_891fd6b7d95f)
- field (class: $iw, name: crossValidator, type: class org.apache.spark.ml.tuning.CrossValidator)
- object (class $iw, $iw@556a6aed)
- field (class: $iw, name: $iw, type: class $iw)