I have an RDD curRdd
of the form
res10: org.apache.spark.rdd.RDD[(scala.collection.immutable.Vector[(Int, Int)], Int)] = ShuffledRDD[102]
with curRdd.collect()
producing the following result.
Array((Vector((5,2)),1), (Vector((1,1)),2), (Vector((1,1), (5,2)),2))
Here key : vector of pairs of ints and value: count
Now, I want to convert it into another RDD of the same form RDD[(scala.collection.immutable.Vector[(Int, Int)], Int)]
by percolating down the counts.
That is (Vector((1,1), (5,2)),2))
will contribute its count of 2 to any key which is a subset of it like (Vector((5,2)),1)
becomes (Vector((5,2)),3)
.
For the example above, our new RDD will have
(Vector((5,2)),3), (Vector((1,1)),4), (Vector((1,1), (5,2)),2)
How do I achieve this? Kindly help.
First you can introduce subsets
operation for Seq
:
implicit class SubSetsOps[T](val elems: Seq[T]) extends AnyVal {
def subsets: Vector[Seq[T]] = elems match {
case Seq() => Vector(elems)
case elem +: rest => {
val recur = rest.subsets
recur ++ recur.map(elem +: _)
}
}
}
empty
subset will allways the be first element in the result vector, so you can omit it with .tail
Now your task is pretty obvious map
-reduce
which is flatMap
-reduceByKey
in terms of RDD
:
val result = curRdd
.flatMap { case (keys, count) => keys.subsets.tail.map(_ -> count) }
.reduceByKey(_ + _)
Update
This implementation could introduce new sets in the result, if you would like to choose only those that was presented in the original collection, you can join result with original:
val result = curRdd
.flatMap { case (keys, count) => keys.subsets.tail.map(_ -> count) }
.reduceByKey(_ + _)
.join(curRdd map identity[(Seq[(Int, Int)], Int)])
.map { case (key, (v, _)) => (key, v) }
Note that map identity
step is needed to convert key type from Vector[_]
to Seq[_]
in the original RDD
. You can instead modify SubSetsOps
definition substituting all occurencest of Seq[T]
with Vector[T]
or change definition following (hardcode scala.collection
) way:
import scala.collection.SeqLike
import scala.collection.generic.CanBuildFrom
implicit class SubSetsOps[T, F[e] <: SeqLike[e, F[e]]](val elems: F[T]) extends AnyVal {
def subsets(implicit cbf: CanBuildFrom[F[T], T, F[T]]): Vector[F[T]] = elems match {
case Seq() => Vector(elems)
case elem +: rest => {
val recur = rest.subsets
recur ++ recur.map(elem +: _)
}
}
}