I know how to write a UDF in Spark SQL:
def belowThreshold(power: Int): Boolean = {
return power < -40
}
sqlContext.udf.register("belowThreshold", belowThreshold _)
Can I do something similar to define an aggregate function? How is this done?
For context, I want to run the following SQL query:
val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")
It should return something like
Row(span1, false, T0)
I want the aggregate function to tell me if there's any values for opticalReceivePower
in the groups defined by span
and timestamp
which are below the threshold. Do I need to write my UDAF differently to the UDF I pasted above?
Supported methods
Spark >= 2.3
Vectorized udf (Python only):
Example usage:
Spark >= 2.0 (optionally 1.6 but with slightly different API):
It is possible to use
Aggregators
on typedDatasets
:Spark >= 1.5:
In Spark 1.5 you can create UDAF like this although it is most likely an overkill:
Example usage:
Spark 1.4 workaround:
I am not sure if I correctly understand your requirements but as far as I can tell plain old aggregation should be enough here:
Spark <= 1.4:
As far I know, at this moment (Spark 1.4.1), there is no support for UDAF, other than the Hive ones. It should be possible with Spark 1.5 (see SPARK-3947).
Unsupported / internal methods
Internally Spark uses a number of classes including
ImperativeAggregates
andDeclarativeAggregates
.There are intended for internal usage and may change without further notice, so it is probably not something you want to use in your production code, but just for completeness
BelowThreshold
withDeclarativeAggregate
could be implemented like this (tested with Spark 2.2-SNAPSHOT):It should be further wrapped with an equivalent of
withAggregateFunction
.