Spark UDF how to convert Map to column

2019-09-07 13:35发布

问题:

I am using Apache Zeppelin notebook. So spark is basically running in interactive mode. I can't use closure variable here since zeppelin throws org.apache.spark.SparkException: Task not serializable as it tries to serialize whole paragraph (bigger closure).

So without closure approach only option I have is to pass map as a column to UDF.

I have a following map collected from paried RDD:

final val idxMap = idxMapRdd.collectAsMap

Which is being used in one of spark transformation here:

def labelStr(predictions: WrappedArray[Double], idxMap: Map[Double, String]): Array[String] = {

    predictions.array.map(idxMap.getOrElse(_, "Other"))
}
@transient val predictionStrUDF = udf { (predictions: WrappedArray[Double], idxMap: Map[Double, String]) =>  labelStr(predictions)}

val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF(col("predictions"), lit(idxMap))) 

But with lit(idxMap) statement I got following error:

java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.HashMap$HashTrieMap

So I tried creating column from using following:

val colmap = map(idxMapArr.map(lit _): _*)

But getting following error:

<console>:139: error: type mismatch;
 found   : Iterable[org.apache.spark.sql.Column]
 required: Seq[org.apache.spark.sql.Column]
       val colmap =  map(idxMapArr.map(lit _): _*)

Closure approach (for completeness):

def predictionStrUDF2( idxMapArr: scala.collection.Map[Double,String]) = {
     udf((predictions: WrappedArray[Double] ) =>  labelStr(predictions, idxMapArr))
}
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF2(idxMapArr)(col("predictions")))

which compile but then when I do cvmlPredictionsStr.show I get following. I think this is due to interactive nature of zeppelin

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
  ... 62 elided
Caused by: java.io.NotSerializableException: com.github.fommil.netlib.F2jBLAS
Serialization stack:
    - object not serializable (class: com.github.fommil.netlib.F2jBLAS, value: com.github.fommil.netlib.F2jBLAS@294770d3)
    - field (class: org.apache.spark.ml.tuning.CrossValidator, name: f2jBLAS, type: class com.github.fommil.netlib.F2jBLAS)
    - object (class org.apache.spark.ml.tuning.CrossValidator, cv_891fd6b7d95f)
    - field (class: $iw, name: crossValidator, type: class org.apache.spark.ml.tuning.CrossValidator)
    - object (class $iw, $iw@556a6aed)
    - field (class: $iw, name: $iw, type: class $iw)

回答1:

The question title is about Spark UDFs, but it seems the real question here is how to avoid the closure serialization problem that some interactive environments exhibit.

From your description of the problem, it sounds like the following doesn't work if executed directly in one of your notebook cells:

val x = 5
sc.parallelize(1 to 10).filter(_ > x).collect()

This is likely because x is a class member of the cell object; when the lambda captures x, it attempts to serialize the entire cell object. The cell object isn't serializable, and the result is a messy exception. This problem can be avoided with a wrapper object. Note that is likely a slicker way to declare this wrapper (perhaps just nesting inside braces is sufficient).

object Wrapper {
    def f() {
        val x = 5
        sc.parallelize(1 to 10).filter(_ > x).collect()
    }
}
Wrapper.f()

You may still have questions after resolving this issue, but currently the question touches on too many different subtopics. Another explanation of the closure serialization problem is available here.