I am trying to use DSL over pure SQL in Spark SQL jobs but I cannot get my UDF works.
sqlContext.udf.register("subdate",(dateTime: Long)=>dateTime.toString.dropRight(6))
This doesn't work
rdd1.toDF.join(rdd2.toDF).where("subdate(rdd1(date_time)) === subdate(rdd2(dateTime))")
I also would like to add another join condition like in this working pure SQL
val results=sqlContext.sql("select * from rdd1 join rdd2 on rdd1.id=rdd2.idand subdate(rdd1.date_time)=subdate(rdd2.dateTime)")
Thanks for your help
SQL expression you pass to where
method is incorrect at least for a few reasons:
===
is a Column
method not a valid SQL equality. You should use single equality sign =
- bracket notation (
table(column)
) is not a valid way to reference columns in SQL. In this context it will be recognized as a function call. SQL uses dot notation (table.column
)
- even if it was neither
rdd1
nor rdd2
are valid table aliases
Since it looks like column names are unambiguous you could simply use following code:
df1.join(df2).where("subdate(date_time) = subdate(dateTime)")
If it wasn't the case using dot syntax wouldn't work without providing aliases first. See for example Usage of spark DataFrame "as" method
Moreover registering UDFs makes sense mostly when you use raw SQL all the way. If you want to use DataFrame
API it is better to use UDF directly:
import org.apache.spark.sql.functions.udf
val subdate = udf((dateTime: Long) => dateTime.toString.dropRight(6))
val df1 = rdd1.toDF
val df2 = rdd2.toDF
df1.join(df2, subdate($"date_time") === subdate($"dateTime"))
or if column names were ambiguous:
df1.join(df2, subdate(df1("date_time")) === subdate(df2("date_time")))
Finally for simple functions like this it is better to compose built-in expressions than create UDFs.