I'd like to get top N
items after groupByKey of RDD
and convert the type of topNPerGroup
(in the below) to RDD[(String, Int)]
where List[Int]
values are flatten
The data
is
val data = sc.parallelize(Seq("foo"->3, "foo"->1, "foo"->2,
"bar"->6, "bar"->5, "bar"->4))
The top N
items per group are computed as:
val topNPerGroup: RDD[(String, List[Int]) = data.groupByKey.map {
case (key, numbers) =>
key -> numbers.toList.sortBy(-_).take(2)
}
The result is
(bar,List(6, 5))
(foo,List(3, 2))
which was printed by
topNPerGroup.collect.foreach(println)
If I achieve, topNPerGroup.collect.foreach(println)
will generate (expected result!)
(bar, 6)
(bar, 5)
(foo, 3)
(foo, 2)
Spark 1.4.0 solves the question.
Take a look at https://github.com/apache/spark/commit/5e6ad24ff645a9b0f63d9c0f17193550963aa0a7
This uses
BoundedPriorityQueue
withaggregateByKey
Your question is a little confusing, but I think this does what you're looking for:
and in the repl it prints out what you want:
Just use
topByKey
:It is also possible provide alternative
Ordering
(not required here). For example if you wanted n smallest values:I've been struggling with this same issue recently but my need was a little different in that I needed the top K values per key with a data set like
(key: Int, (domain: String, count: Long))
. While your dataset is simpler there is still a scaling/performance issue by using groupByKey as noted in the documentation.In my case I ran into problems very quickly because my
Iterable
in(K, Iterable<V>)
was very large, > 1 million, so the sorting and taking of the top N became very expensive and creates potential memory issues.After some digging, see references below, here is a full example using combineByKey to accomplish the same task in a way that will perform and scale.
And what I get in the returning rdd...
References
https://www.mail-archive.com/user@spark.apache.org/msg16827.html
https://stackoverflow.com/a/8275562/807318
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions