I have an RDD of (String,String,Int)
.
- I want to reduce it based on the first two strings
- And Then based on the first String I want to group the (String,Int) and sort them
- After sorting I need to group them into small groups each containing n elements.
I have done the code below. The problem is the number of elements in the step 2 is very large for a single key
and the reduceByKey(x++y)
takes a lot of time.
//Input
val data = Array(
("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1),
("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))
val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))
// This is taking long time.
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y)
// from the list I will be doing grouping.
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList))
Problem is the "c1" has lot of unique entries like b1 ,b2....million and reduceByKey
is killing time because all the values are going to single node.
Is there a way to achieve this more efficiently?
// output
Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))
There at least few problems with a way you group your data. The first problem is introduced by
It creates a large amount of mutable objects which provide no additional value since you cannot leverage their mutability in the subsequent
reduceByKey
where each
++
creates a new collection and neither argument can be safely mutated. SincereduceByKey
applies map side aggregation situation is even worse and pretty much creates GC hell.Unless you have some deeper knowledge about data distribution which can be used to define smarter partitioner the simplest improvement is to replace
mapValues
+reduceByKey
with simplegroupByKey
:It should be also possible to use a custom partitioner for both
reduceByKey
calls andmapPartitions
withpreservesPartitioning
instead ofmap
.It requires only a single shuffle and
groupByKey
is simply a local operations: