This question is about the duality between DataFrame
and RDD
when it comes to aggregation operations. In Spark SQL one can use table generating UDFs for custom aggregations but creating one of those is typically noticeably less user-friendly than using the aggregation functions available for RDDs, especially if table output is not required.
Is there an efficient way to apply pair RDD operations such as aggregateByKey
to a DataFrame which has been grouped using GROUP BY or ordered using ORDERED BY?
Normally, one would need an explicit map
step to create key-value tuples, e.g., dataFrame.rdd.map(row => (row.getString(row.fieldIndex("category")), row).aggregateByKey(...)
. Can this be avoided?
Not really. While DataFrames
can be converted to RDDs
and vice versa this is relatively complex operation and methods like DataFrame.groupBy
don't have the same semantics as their counterparts on RDD
.
The closest thing you can get is a new DataSet
API introduced in Spark 1.6.0. It provides a much closer integration with DataFrames
and GroupedDataset
class with its own set of methods including reduce
, cogroup
or mapGroups
:
case class Record(id: Long, key: String, value: Double)
val df = sc.parallelize(Seq(
(1L, "foo", 3.0), (2L, "bar", 5.6),
(3L, "foo", -1.0), (4L, "bar", 10.0)
)).toDF("id", "key", "value")
val ds = df.as[Record]
ds.groupBy($"key").reduce((x, y) => if (x.id < y.id) x else y).show
// +-----+-----------+
// | _1| _2|
// +-----+-----------+
// |[bar]|[2,bar,5.6]|
// |[foo]|[1,foo,3.0]|
// +-----+-----------+
In some specific cases it is possible to leverage Orderable
semantics to group and process data using structs
or arrays
. You'll find an example in SPARK DataFrame: select the first row of each group