subsets manipulation on vectors in spark scala

2019-08-11 11:50发布

问题:

I have an RDD curRdd of the form

res10: org.apache.spark.rdd.RDD[(scala.collection.immutable.Vector[(Int, Int)], Int)] = ShuffledRDD[102]

with curRdd.collect() producing the following result.

Array((Vector((5,2)),1), (Vector((1,1)),2), (Vector((1,1), (5,2)),2))

Here key : vector of pairs of ints and value: count

Now, I want to convert it into another RDD of the same form RDD[(scala.collection.immutable.Vector[(Int, Int)], Int)] by percolating down the counts.

That is (Vector((1,1), (5,2)),2)) will contribute its count of 2 to any key which is a subset of it like (Vector((5,2)),1) becomes (Vector((5,2)),3).

For the example above, our new RDD will have

(Vector((5,2)),3), (Vector((1,1)),4), (Vector((1,1), (5,2)),2)

How do I achieve this? Kindly help.

回答1:

First you can introduce subsets operation for Seq:

implicit class SubSetsOps[T](val elems: Seq[T]) extends AnyVal {
    def subsets: Vector[Seq[T]] = elems match {
      case Seq() => Vector(elems)
      case elem +: rest => {
        val recur = rest.subsets
        recur ++ recur.map(elem +: _)
      }
    }
  }

empty subset will allways the be first element in the result vector, so you can omit it with .tail

Now your task is pretty obvious map-reduce which is flatMap-reduceByKey in terms of RDD:

 val result = curRdd
      .flatMap { case (keys, count) => keys.subsets.tail.map(_ -> count) }
      .reduceByKey(_ + _)

Update

This implementation could introduce new sets in the result, if you would like to choose only those that was presented in the original collection, you can join result with original:

val result = curRdd
  .flatMap { case (keys, count) => keys.subsets.tail.map(_ -> count) }
  .reduceByKey(_ + _)
  .join(curRdd map identity[(Seq[(Int, Int)], Int)])
  .map { case (key, (v, _)) => (key, v) }

Note that map identity step is needed to convert key type from Vector[_] to Seq[_] in the original RDD. You can instead modify SubSetsOps definition substituting all occurencest of Seq[T] with Vector[T] or change definition following (hardcode scala.collection) way:

import scala.collection.SeqLike
import scala.collection.generic.CanBuildFrom

implicit class SubSetsOps[T, F[e] <: SeqLike[e, F[e]]](val elems: F[T]) extends AnyVal {
    def subsets(implicit cbf: CanBuildFrom[F[T], T, F[T]]): Vector[F[T]] = elems match {
      case Seq() => Vector(elems)
      case elem +: rest => {
        val recur = rest.subsets
        recur ++ recur.map(elem +: _)
      }
    }
  }