Efficient PairRDD operations on DataFrame with Spa

2019-05-08 23:05发布

问题:

This question is about the duality between DataFrame and RDD when it comes to aggregation operations. In Spark SQL one can use table generating UDFs for custom aggregations but creating one of those is typically noticeably less user-friendly than using the aggregation functions available for RDDs, especially if table output is not required.

Is there an efficient way to apply pair RDD operations such as aggregateByKey to a DataFrame which has been grouped using GROUP BY or ordered using ORDERED BY?

Normally, one would need an explicit map step to create key-value tuples, e.g., dataFrame.rdd.map(row => (row.getString(row.fieldIndex("category")), row).aggregateByKey(...). Can this be avoided?

回答1:

Not really. While DataFrames can be converted to RDDs and vice versa this is relatively complex operation and methods like DataFrame.groupBy don't have the same semantics as their counterparts on RDD.

The closest thing you can get is a new DataSet API introduced in Spark 1.6.0. It provides a much closer integration with DataFrames and GroupedDataset class with its own set of methods including reduce, cogroup or mapGroups:

case class Record(id: Long, key: String, value: Double)

val df = sc.parallelize(Seq(
    (1L, "foo", 3.0), (2L, "bar", 5.6),
    (3L, "foo", -1.0), (4L, "bar", 10.0)
)).toDF("id", "key", "value")

val ds = df.as[Record]
ds.groupBy($"key").reduce((x, y) => if (x.id < y.id) x else y).show

// +-----+-----------+
// |   _1|         _2|
// +-----+-----------+
// |[bar]|[2,bar,5.6]|
// |[foo]|[1,foo,3.0]|
// +-----+-----------+

In some specific cases it is possible to leverage Orderable semantics to group and process data using structs or arrays. You'll find an example in SPARK DataFrame: select the first row of each group