How to reverse the result of reduceByKey using RDD

2019-07-28 21:16发布

问题:

I have a RDD of (key, value) that I transformed into a RDD of (key, List(value1, value2, value3) as follow.

val rddInit = sc.parallelize(List((1, 2), (1, 3), (2, 5), (2, 7), (3, 10)))
val rddReduced = rddInit..groupByKey.mapValues(_.toList)
rddReduced.take(3).foreach(println)

This code give me the next RDD : (1,List(2, 3)) (2,List(5, 7)) (3,List(10))

But now I would like to go back to the rddInit from the rdd I just computed (the rddReduced rdd).

My first guess is to realise some kind of cross product between the key and each element of the List like this :

rddReduced.map{
  case (x, y) =>
    val myList:ListBuffer[(Int, Int)] = ListBuffer()
    for(element <- y) {
      myList+=new Pair(x, element)
    }
    myList.toList
}.flatMap(x => x).take(5).foreach(println)

With this code, I get the initial RDD as a result. But I don't think using a ListBuffer inside a spark job is a good practice. Is there any other way to resolve this problem ?

回答1:

It's obviously not a good practice to use that kind of operation.

From what I have learned in a spark-summit course, you have to use Dataframes and Datasets as much as possible, using them you will benefit from a lot of optimizations form spark engine.

What you wanna do is called explode and it's preformed by applying the explode method from the sql.functions package

The solution whould be something like this :

 import spark.implicits._
 import org.apache.spark.sql.functions.explode
 import org.apache.spark.sql.functions.collect_list

 val dfInit = sc.parallelize(List((1, 2), (1, 3), (2, 5), (2, 7), (3, 10))).toDF("x", "y")
 val dfReduced = dfInit.groupBy("x").agg(collect_list("y") as "y")
 val dfResult = dfReduced.withColumn("y", explode($"y"))

dfResult will contains the same data as the dfInit



回答2:

I'm surprised no one has offered a solution with Scala's for-comprehension (that gets "desugared" to flatMap and map at compile time).

I don't use this syntax very often, but when I do...I find it quite entertaining. Some people prefer for-comprehension over a series of flatMap and map, esp. for more complex transformations.

// that's what you ended up with after `groupByKey.mapValues`
val rddReduced: RDD[(Int, List[Int])] = ...
val r = for {
  (k, values) <- rddReduced
  v <- values
} yield (k, v)

scala> :type r
org.apache.spark.rdd.RDD[(Int, Int)]

scala> r.foreach(println)
(3,10)
(2,5)
(2,7)
(1,2)
(1,3)

// even nicer to our eyes
scala> r.toDF("key", "value").show
+---+-----+
|key|value|
+---+-----+
|  1|    2|
|  1|    3|
|  2|    5|
|  2|    7|
|  3|   10|
+---+-----+

After all, that's why we enjoy flexibility of Scala, isn't it?



回答3:

According to your question, I think this is what you want to do

rddReduced.map{case(x, y) => y.map((x,_))}.flatMap(_).take(5).foreach(println)

You get a list after group by in which you can map through it again.



回答4:

Here's one way to restore the grouped RDD back to original:

val rddRestored = rddReduced.flatMap{
    case (k, v) => v.map((k, _))
  }

rddRestored.collect.foreach(println)
(1,2)
(1,3)
(2,5)
(2,7)
(3,10)