UDF in Spark SQL DSL

2019-08-09 23:59发布

问题:

I am trying to use DSL over pure SQL in Spark SQL jobs but I cannot get my UDF works.

sqlContext.udf.register("subdate",(dateTime: Long)=>dateTime.toString.dropRight(6))

This doesn't work

rdd1.toDF.join(rdd2.toDF).where("subdate(rdd1(date_time)) === subdate(rdd2(dateTime))")

I also would like to add another join condition like in this working pure SQL

val results=sqlContext.sql("select * from rdd1 join rdd2 on rdd1.id=rdd2.idand subdate(rdd1.date_time)=subdate(rdd2.dateTime)")

Thanks for your help

回答1:

SQL expression you pass to where method is incorrect at least for a few reasons:

  • === is a Column method not a valid SQL equality. You should use single equality sign =
  • bracket notation (table(column)) is not a valid way to reference columns in SQL. In this context it will be recognized as a function call. SQL uses dot notation (table.column)
  • even if it was neither rdd1 nor rdd2 are valid table aliases

Since it looks like column names are unambiguous you could simply use following code:

df1.join(df2).where("subdate(date_time) = subdate(dateTime)")

If it wasn't the case using dot syntax wouldn't work without providing aliases first. See for example Usage of spark DataFrame "as" method

Moreover registering UDFs makes sense mostly when you use raw SQL all the way. If you want to use DataFrame API it is better to use UDF directly:

import org.apache.spark.sql.functions.udf

val subdate = udf((dateTime: Long) => dateTime.toString.dropRight(6)) 

val df1 = rdd1.toDF
val df2 = rdd2.toDF

df1.join(df2, subdate($"date_time") === subdate($"dateTime"))

or if column names were ambiguous:

df1.join(df2, subdate(df1("date_time")) === subdate(df2("date_time")))

Finally for simple functions like this it is better to compose built-in expressions than create UDFs.