Spark: reduce/aggregate by key

2019-09-21 16:06发布

问题:

I am new to Spark and Scala, so I have no idea how this kind of problem is called (which makes searching for it pretty hard).

I have data of the following structure:

[(date1, (name1, 1)), (date1, (name1, 1)), (date1, (name2, 1)), (date2, (name3, 1))]

In some way, this has to be reduced/aggregated to:

[(date1, [(name1, 2), (name2, 1)]), (date2, [(name3, 1)])]

I know how to do reduceByKey on a list of key-value pairs, but this particular problem is a mystery to me.

Thanks in advance!

回答1:

My data, but here goes, step-wise:

val rdd1 = sc.makeRDD(Array( ("d1",("A",1)), ("d1",("A",1)), ("d1",("B",1)), ("d2",("E",1)) ),2)
val rdd2 = rdd1.map(x => ((x._1, x._2._1), x._2._2))
val rdd3 = rdd2.groupByKey
val rdd4 = rdd3.map{ 
   case (str, nums) => (str, nums.sum) 
}
val rdd5 =  rdd4.map(x => (x._1._1, (x._1._2, x._2))).groupByKey
rdd5.collect

returns:

res28: Array[(String, Iterable[(String, Int)])] = Array((d2,CompactBuffer((E,1))), (d1,CompactBuffer((A,2), (B,1))))

Better approach avoiding groupByKey is as follows:

val rdd1 = sc.makeRDD(Array( ("d1",("A",1)), ("d1",("A",1)), ("d1",("B",1)), ("d2",("E",1)) ),2)
val rdd2 = rdd1.map(x => ((x._1, x._2._1), (x._2._2))) // Need to add quotes around V part for reduceByKey
val rdd3 = rdd2.reduceByKey(_+_)
val rdd4 = rdd3.map(x => (x._1._1, (x._1._2, x._2))).groupByKey // Necessary Shuffle
rdd4.collect

As I stated in the columns it can be done with DataFrames for structured data, so run this below:

// This above should be enough.
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

val rddA = sc.makeRDD(Array( ("d1","A",1), ("d1","A",1), ("d1","B",1), ("d2","E",1) ),2)
val dfA = rddA.toDF("c1", "c2", "c3")

val dfB = dfA
   .groupBy("c1", "c2")
   .agg(sum("c3").alias("sum"))
dfB.show

returns:

+---+---+---+
| c1| c2|sum|
+---+---+---+
| d1|  A|  2|
| d2|  E|  1|
| d1|  B|  1|
+---+---+---+

But you can do this to approximate the above of the CompactBuffer above.

import org.apache.spark.sql.functions.{col, udf}

case class XY(x: String, y: Long)
val xyTuple = udf((x: String, y: Long) => XY(x, y))

val dfC = dfB
         .withColumn("xy", xyTuple(col("c2"), col("sum")))
         .drop("c2")
         .drop("sum")

dfC.printSchema
dfC.show

// Then ... this gives you the CompactBuffer answer but from a DF-perspective
val dfD = dfC.groupBy(col("c1")).agg(collect_list(col("xy")))   
dfD.show

returns - some renaming req'd and possible sorting:

---+----------------+
| c1|collect_list(xy)|
+---+----------------+
| d2|        [[E, 1]]|
| d1|[[A, 2], [B, 1]]|
+---+----------------+