Operate on neighbor elements in RDD in Spark

2020-01-28 09:34发布

问题:

As I have a collection:

List(1, 3,-1, 0, 2, -4, 6)

It's easy to make it sorted as:

List(-4, -1, 0, 1, 2, 3, 6)

Then I can construct a new collection by compute 6 - 3, 3 - 2, 2 - 1, 1 - 0, and so on like this:

for(i <- 0 to list.length -2) yield {
    list(i + 1) - list(i)
}

and get a vector:

Vector(3, 1, 1, 1, 1, 3)

That is, I want to make the next element minus the current element.

But how to implement this in RDD on Spark?

I know for the collection:

List(-4, -1, 0, 1, 2, 3, 6)

There will be some partitions of the collection, each partition is ordered, can I do the similar operation on each partition and collect results on each partition together?

回答1:

The most efficient solution is to use sliding method:

import org.apache.spark.mllib.rdd.RDDFunctions._

val rdd = sc.parallelize(Seq(1, 3,-1, 0, 2, -4, 6))
  .sortBy(identity)
  .sliding(2)
  .map{case Array(x, y) => y - x}


回答2:

Suppose you have something like

val seq = sc.parallelize(List(1, 3, -1, 0, 2, -4, 6)).sortBy(identity)

Let's create first collection with index as key like Ton Torres suggested

val original = seq.zipWithIndex.map(_.swap)

Now we can build collection shifted by one element.

val shifted = original.map { case (idx, v) => (idx - 1, v) }.filter(_._1 >= 0)

Next we can calculate needed differences ordered by index descending

val diffs = original.join(shifted)
      .sortBy(_._1, ascending = false)
      .map { case (idx, (v1, v2)) => v2 - v1 }

So

 println(diffs.collect.toSeq)

shows

WrappedArray(3, 1, 1, 1, 1, 3)

Note that you can skip the sortBy step if reversing is not critical.

Also note that for local collection this could be computed much more simple like:

val elems = List(1, 3, -1, 0, 2, -4, 6).sorted  

(elems.tail, elems).zipped.map(_ - _).reverse

But in case of RDD the zip method requires each collection should contain equal element count for each partition. So if you would implement tail like

val tail = seq.zipWithIndex().filter(_._2 > 0).map(_._1)  

tail.zip(seq) would not work since both collection needs equal count of elements for each partition and we have one element for each partition that should travel to previous partition.