I have a RDD of (key, value) that I transformed into a RDD of (key, List(value1, value2, value3) as follow.
val rddInit = sc.parallelize(List((1, 2), (1, 3), (2, 5), (2, 7), (3, 10)))
val rddReduced = rddInit..groupByKey.mapValues(_.toList)
rddReduced.take(3).foreach(println)
This code give me the next RDD :
(1,List(2, 3)) (2,List(5, 7)) (3,List(10))
But now I would like to go back to the rddInit from the rdd I just computed (the rddReduced rdd).
My first guess is to realise some kind of cross product between the key and each element of the List like this :
rddReduced.map{
case (x, y) =>
val myList:ListBuffer[(Int, Int)] = ListBuffer()
for(element <- y) {
myList+=new Pair(x, element)
}
myList.toList
}.flatMap(x => x).take(5).foreach(println)
With this code, I get the initial RDD as a result. But I don't think using a ListBuffer inside a spark job is a good practice. Is there any other way to resolve this problem ?
It's obviously not a good practice to use that kind of operation.
From what I have learned in a spark-summit course, you have to use Dataframe
s and Dataset
s as much as possible, using them you will benefit from a lot of optimizations form spark engine.
What you wanna do is called explode
and it's preformed by applying the explode
method from the sql.functions
package
The solution whould be something like this :
import spark.implicits._
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.collect_list
val dfInit = sc.parallelize(List((1, 2), (1, 3), (2, 5), (2, 7), (3, 10))).toDF("x", "y")
val dfReduced = dfInit.groupBy("x").agg(collect_list("y") as "y")
val dfResult = dfReduced.withColumn("y", explode($"y"))
dfResult
will contains the same data as the dfInit
I'm surprised no one has offered a solution with Scala's for-comprehension (that gets "desugared" to flatMap
and map
at compile time).
I don't use this syntax very often, but when I do...I find it quite entertaining. Some people prefer for-comprehension over a series of flatMap
and map
, esp. for more complex transformations.
// that's what you ended up with after `groupByKey.mapValues`
val rddReduced: RDD[(Int, List[Int])] = ...
val r = for {
(k, values) <- rddReduced
v <- values
} yield (k, v)
scala> :type r
org.apache.spark.rdd.RDD[(Int, Int)]
scala> r.foreach(println)
(3,10)
(2,5)
(2,7)
(1,2)
(1,3)
// even nicer to our eyes
scala> r.toDF("key", "value").show
+---+-----+
|key|value|
+---+-----+
| 1| 2|
| 1| 3|
| 2| 5|
| 2| 7|
| 3| 10|
+---+-----+
After all, that's why we enjoy flexibility of Scala, isn't it?
According to your question, I think this is what you want to do
rddReduced.map{case(x, y) => y.map((x,_))}.flatMap(_).take(5).foreach(println)
You get a list after group by in which you can map through it again.
Here's one way to restore the grouped RDD back to original:
val rddRestored = rddReduced.flatMap{
case (k, v) => v.map((k, _))
}
rddRestored.collect.foreach(println)
(1,2)
(1,3)
(2,5)
(2,7)
(3,10)