Spark RDD: How to calculate statistics most effici

2019-02-26 09:33发布

问题:

Assuming the existence of an RDD of tuples similar to the following:

(key1, 1)
(key3, 9)
(key2, 3)
(key1, 4)
(key1, 5)
(key3, 2)
(key2, 7)
...

What is the most efficient (and, ideally, distributed) way to compute statistics corresponding to each key? (At the moment, I am looking to calculate standard deviation / variance, in particular.) As I understand it, my options amount to:

  1. Use the colStats function in MLLib: This approach has the advantage of easily-adaptable to use other mllib.stat functions later, if other statistical computations are deemed necessary. However, it operates on an RDD of Vector containing the data for each column, so as I understand it, this approach would require that the full set of values for each key be collected on a single node, which would seem non-ideal for large data sets. Does a Spark Vector always imply that the data in the Vector be resident locally, on a single node?
  2. Perform a groupByKey, then stats: Likely shuffle-heavy, as a result of the groupByKey operation.
  3. Perform aggregateByKey, initializing a new StatCounter, and using StatCounter::merge as the sequence and combiner functions: This is the approach recommended by this StackOverflow answer, and avoids the groupByKey from option 2. However, I haven't been able to find good documentation for StatCounter in PySpark.

I like Option 1 because it makes the code more extensible, in that it could easily accommodate more complicated calculations using other MLLib functions with similar contracts, but if the Vector inputs inherently require that the data sets be collected locally, then it limits the data sizes on which the code can effectively operate. Between the other two, Option 3 looks more efficient because it avoids the groupByKey, but I was hoping to confirm that that is the case.

Are there any other options I haven't considered? (I am currently using Python + PySpark, but I'm open to solutions in Java/Scala as well, if there is a language difference.)

回答1:

You can try reduceByKey. It's pretty straightforward if we only want to compute the min():

rdd.reduceByKey(lambda x,y: min(x,y)).collect()
#Out[84]: [('key3', 2.0), ('key2', 3.0), ('key1', 1.0)]

To calculate the mean, you'll first need to create (value, 1) tuples which we use to calculate both the sum and count in the reduceByKey operation. Lastly we divide them by each other to arrive at the mean:

meanRDD = (rdd
           .mapValues(lambda x: (x, 1))
           .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
           .mapValues(lambda x: x[0]/x[1]))

meanRDD.collect()
#Out[85]: [('key3', 5.5), ('key2', 5.0), ('key1', 3.3333333333333335)]

For the variance, you can use the formula (sumOfSquares/count) - (sum/count)^2, which we translate in the following way:

varRDD = (rdd
          .mapValues(lambda x: (1, x, x*x))
          .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2]))
          .mapValues(lambda x: (x[2]/x[0] - (x[1]/x[0])**2)))

varRDD.collect()
#Out[106]: [('key3', 12.25), ('key2', 4.0), ('key1', 2.8888888888888875)]

I used values of type double instead of int in the dummy data to accurately illustrate computing the average and variance:

rdd = sc.parallelize([("key1", 1.0),
                      ("key3", 9.0),
                      ("key2", 3.0),
                      ("key1", 4.0),
                      ("key1", 5.0),
                      ("key3", 2.0),
                      ("key2", 7.0)])