I have a DataFrame generated as follow:
df.groupBy($"Hour", $"Category")
.agg(sum($"value") as "TotalValue")
.sort($"Hour".asc, $"TotalValue".desc))
The results look like:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
As you can see, the DataFrame is ordered by Hour
in an increasing order, then by TotalValue
in a descending order.
I would like to select the top row of each group, i.e.
- from the group of Hour==0 select (0,cat26,30.9)
- from the group of Hour==1 select (1,cat67,28.5)
- from the group of Hour==2 select (2,cat56,39.6)
- and so on
So the desired output would be:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 1| cat67| 28.5|
| 2| cat56| 39.6|
| 3| cat8| 35.6|
| ...| ...| ...|
+----+--------+----------+
It might be handy to be able to select the top N rows of each group as well.
Any help is highly appreciated.
This is a exact same of zero323's answer but in SQL query way.
Assuming that dataframe is created and registered as
Window function :
Plain SQL aggregation followed by join:
Using ordering over structs:
DataSets way and don't dos are same as in original answer
The solution below does only one groupBy and extract the rows of your dataframe that contain the maxValue in one shot. No need for further Joins, or Windows.
Here you can do like this -
Window functions:
Something like this should do the trick:
This method will be inefficient in case of significant data skew.
Plain SQL aggregation followed by
join
:Alternatively you can join with aggregated data frame:
It will keep duplicate values (if there is more than one category per hour with the same total value). You can remove these as follows:
Using ordering over
structs
:Neat, although not very well tested, trick which doesn't require joins or window functions:
With DataSet API (Spark 1.6+, 2.0+):
Spark 1.6:
Spark 2.0 or later:
The last two methods can leverage map side combine and don't require full shuffle so most of the time should exhibit a better performance compared to window functions and joins. These cane be also used with Structured Streaming in
completed
output mode.Don't use:
It may seem to work (especially in the
local
mode) but it is unreliable (SPARK-16207). Credits to Tzach Zohar for linking relevant JIRA issue.The same note applies to
which internally uses equivalent execution plan.
For Spark 2.0.2 with grouping by multiple columns:
We can use the rank() window function (where you would choose the rank = 1) rank just adds a number for every row of a group (in this case it would be the hour)
here's an example. ( from https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sql-functions.adoc#rank )