As I have a collection:
List(1, 3,-1, 0, 2, -4, 6)
It's easy to make it sorted as:
List(-4, -1, 0, 1, 2, 3, 6)
Then I can construct a new collection by compute 6 - 3, 3 - 2, 2 - 1, 1 - 0, and so on like this:
for(i <- 0 to list.length -2) yield {
list(i + 1) - list(i)
}
and get a vector:
Vector(3, 1, 1, 1, 1, 3)
That is, I want to make the next element minus the current element.
But how to implement this in RDD on Spark?
I know for the collection:
List(-4, -1, 0, 1, 2, 3, 6)
There will be some partitions of the collection, each partition is ordered, can I do the similar operation on each partition and collect results on each partition together?
The most efficient solution is to use sliding
method:
import org.apache.spark.mllib.rdd.RDDFunctions._
val rdd = sc.parallelize(Seq(1, 3,-1, 0, 2, -4, 6))
.sortBy(identity)
.sliding(2)
.map{case Array(x, y) => y - x}
Suppose you have something like
val seq = sc.parallelize(List(1, 3, -1, 0, 2, -4, 6)).sortBy(identity)
Let's create first collection with index as key like Ton Torres suggested
val original = seq.zipWithIndex.map(_.swap)
Now we can build collection shifted by one element.
val shifted = original.map { case (idx, v) => (idx - 1, v) }.filter(_._1 >= 0)
Next we can calculate needed differences ordered by index descending
val diffs = original.join(shifted)
.sortBy(_._1, ascending = false)
.map { case (idx, (v1, v2)) => v2 - v1 }
So
println(diffs.collect.toSeq)
shows
WrappedArray(3, 1, 1, 1, 1, 3)
Note that you can skip the sortBy
step if reversing is not critical.
Also note that for local collection this could be computed much more simple like:
val elems = List(1, 3, -1, 0, 2, -4, 6).sorted
(elems.tail, elems).zipped.map(_ - _).reverse
But in case of RDD
the zip
method requires each collection should contain equal element count for each partition. So if you would implement tail
like
val tail = seq.zipWithIndex().filter(_._2 > 0).map(_._1)
tail.zip(seq)
would not work since both collection needs equal count of elements for each partition and we have one element for each partition that should travel to previous partition.