I'm trying to write an UDF which returns a complex type:
private val toPrice = UDF1<String, Map<String, String>> { s ->
val elements = s.split(" ")
mapOf("value" to elements[0], "currency" to elements[1])
}
val type = DataTypes.createStructType(listOf(
DataTypes.createStructField("value", DataTypes.StringType, false),
DataTypes.createStructField("currency", DataTypes.StringType, false)))
df.sqlContext().udf().register("toPrice", toPrice, type)
but any time I use this:
df = df.withColumn("price", callUDF("toPrice", col("price")))
I get a cryptic error:
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$28: (string) => struct<value:string,currency:string>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: {value=138.0, currency=USD} (of class java.util.LinkedHashMap)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
... 19 more
I tried to use a custom data type:
class Price(val value: Double, val currency: String) : Serializable
with an UDF which returns that type:
private val toPrice = UDF1<String, Price> { s ->
val elements = s.split(" ")
Price(elements[0].toDouble(), elements[1])
}
but then I get another MatchError
which complains for the Price
type.
How do I properly write an UDF which can return a complex type?
TL;DR The function should return an object of class
org.apache.spark.sql.Row
.Spark provides two main variants of
UDF
definitions.udf
variants using Scala reflection:def udf[RT](f: () ⇒ RT)(implicit arg0: TypeTag[RT]): UserDefinedFunction
def udf[RT, A1](f: (A1) ⇒ RT)(implicit arg0: TypeTag[RT], arg1: TypeTag[A1]): UserDefinedFunction
def udf[RT, A1, A2, ..., A10](f: (A1, A2, ..., A10) ⇒ RT)(implicit arg0: TypeTag[RT], arg1: TypeTag[A1], arg2: TypeTag[A2], ..., arg10: TypeTag[A10])
which define
These variants are used without schema with atomics or algebraic data types. For example the function in question would be defined in Scala:
In this variant return type is automatically encoded.
Due to it's dependence on reflection this variant is intended primarily for Scala users.
udf
variants providing schema definition (one you use here). The return type for this variant, should be the same as forDataset[Row]
:As pointed out in the other answer you can use only the types listed in the SQL types mapping table (atomic types either boxed or unboxed,
java.sql.Timestamp
/java.sql.Date
, as well as high level collections).Complex structures (
structs
/StructTypes
) are expressed usingorg.apache.spark.sql.Row
. No mixing with algebraic data types or equivalent is allowed. For example (Scala code)should be expressed as
not
or any mixed variant, like
This variant is provided primarily to ensure Java interoperability.
In this case (equivalent to the one in question) the definition should be similar to the following one:
Excluding all the nuances of exception handling (in general
UDFs
should contr ol fornull
input and by convention gracefully handle malformed data) Java equivalent should look more or less like this:Context:
To give you some context this distinction is reflected in the other parts of the API as well. For example, you can create
DataFrame
from a schema and a sequence ofRows
:or using reflection with a sequence of
Products
but no mixed variants are supported.
In other words you should provide input that can be encoded using
RowEncoder
.Of course you wouldn't normally use
udf
for the task like this one:Related:
It simple. Go to Data Types Reference and find the corresponding type.
In Spark 2.3
StructType
the functions has to returnorg.apache.spark.sql.Row
.Map<String, String>
function return type should beMapType
- clearly not what you want.