Is there a better way for reduce operation on RDD[

2019-07-20 19:24发布

问题:

I want to reduce a RDD[Array[Double]] in order to each element of the array will be add with the same element of the next array. I use this code for the moment :

var rdd1 = RDD[Array[Double]]

var coord = rdd1.reduce( (x,y) => { (x, y).zipped.map(_+_) })

Is there a better way to make this more efficiently because it cost a harm.

回答1:

Using zipped.map is very inefficient, because it creates a lot of temporary objects and boxes the doubles.

If you use spire, you can just do this

> import spire.implicits._
> val rdd1 = sc.parallelize(Seq(Array(1.0, 2.0), Array(3.0, 4.0)))
> var coord = rdd1.reduce( _ + _)
res1: Array[Double] = Array(4.0, 6.0)

This is much nicer to look at, and should also be much more efficient.

Spire is a dependency of spark, so you should be able to do the above without any extra dependencies. At least it worked with a spark-shell for spark 1.3.1 here.

This will work for any array where there is an AdditiveSemigroup typeclass instance available for the element type. In this case, the element type is Double. Spire typeclasses are @specialized for double, so there will be no boxing going on anywhere.

If you really want to know what is going on to make this work, you have to use reify:

> import scala.reflect.runtime.{universe => u}
> val a = Array(1.0, 2.0)
> val b = Array(3.0, 4.0)
> u.reify { a + b }

res5: reflect.runtime.universe.Expr[Array[Double]] = Expr[scala.Array[Double]](
  implicits.additiveSemigroupOps(a)(
    implicits.ArrayNormedVectorSpace(
      implicits.DoubleAlgebra, 
      implicits.DoubleAlgebra,
      Predef.this.implicitly)).$plus(b))

So the addition works because there is an instance of AdditiveSemigroup for Array[Double].



回答2:

I assume the concern is that you have very large Array[Double] and the transformation as written does not distribute the addition of them. If so, you could do something like (untested):

// map Array[Double] to (index, double)
val rdd2 = rdd1.flatMap(a => a.zipWithIndex.map(t => (t._2,t._1))
// get the sum for each index
val reduced = rdd2.reduceByKey(_ + _)
// key everything the same to get a single iterable in groubByKey
val groupAll = reduced.map(t => ("constKey", (t._1, t._2)
// get the doubles back together into an array
val coord = groupAll.groupByKey { (k,vs) => 
                     vs.toList.sortBy(_._1).toArray.map(_._2) }