ways to replace groupByKey in apache Spark

2020-05-01 05:12发布

问题:

I would like to know best way to replace groupByKey operation with another.

Basically I would like to obtain an RDD[(int,List[Measure]), my situation:

// consider measures like RDD of objects
measures.keyBy(_.getId)
        .groupByKey

My idea is to use reduceByKey instead, bacause it cause less shuffle:

measures.keyBy(_.getId)
        .mapValues(List(_))
        .reduceByKey(_++_) 

But I think is very inefficient cause it force me to instantiate a tons of unnecessary List objects.

Can anyone have others idea to replace groupByKey?

回答1:

Another way is using aggregateByKey, which is specifically for combining values into a type different from the original values:

measures.keyBy(_.getId)
        .aggregateByKey(List[Measure]())(_ :+ _, _ ++ _)

This creates an empty list for each key in each partition, appends all values to these in each partition, then finally shuffles the lists to concatenate all for each key.

Appending to a list in Scala is O(n), it is better to prepend, which is O(1), but looks a bit less clean:

measures.keyBy(_.getId)
        .aggregateByKey(List[Measure]())(_.+:(_), _ ++ _)

or:

measures.keyBy(_.getId)
        .aggregateByKey(List[Measure]())((l, v) => v +: l, _ ++ _)

This is probably more efficient than your reduceByKey example, but the situations where reduceByKey and aggregateByKey are far superior over groupByKey are where you can first make a large reduction in data size, and only shuffle the much smaller results around. In this case you don't have this reduction: the intermediate lists contain all the data you start out with, so you are still shuffling with your full data set when the per-partition lists are combined (this holds similarly for using reduceByKey).

Moreover, as zero323 pointed out, groupByKey is actually more efficient in this case because it knows it is building lists of all the data and can perform optimisations specifically for that:

  • It disables map-side aggregation which prevents building a big hash map with all the data
  • It uses a smart buffer (CompactBuffer), which reduces the amount of memory allocations significantly compared to building up immutable lists one by one.

Another situation where the difference between groupByKey and reduceByKey or aggregateByKey may be minimal is when the number of keys isn't much smaller than the number of values.