I am trying to use DSL over pure SQL in Spark SQL jobs but I cannot get my UDF works.
sqlContext.udf.register("subdate",(dateTime: Long)=>dateTime.toString.dropRight(6))
This doesn't work
rdd1.toDF.join(rdd2.toDF).where("subdate(rdd1(date_time)) === subdate(rdd2(dateTime))")
I also would like to add another join condition like in this working pure SQL
val results=sqlContext.sql("select * from rdd1 join rdd2 on rdd1.id=rdd2.idand subdate(rdd1.date_time)=subdate(rdd2.dateTime)")
Thanks for your help
SQL expression you pass to
where
method is incorrect at least for a few reasons:===
is aColumn
method not a valid SQL equality. You should use single equality sign=
table(column)
) is not a valid way to reference columns in SQL. In this context it will be recognized as a function call. SQL uses dot notation (table.column
)rdd1
norrdd2
are valid table aliasesSince it looks like column names are unambiguous you could simply use following code:
If it wasn't the case using dot syntax wouldn't work without providing aliases first. See for example Usage of spark DataFrame "as" method
Moreover registering UDFs makes sense mostly when you use raw SQL all the way. If you want to use
DataFrame
API it is better to use UDF directly:or if column names were ambiguous:
Finally for simple functions like this it is better to compose built-in expressions than create UDFs.