Spark: difference of semantics between reduce and

2019-03-18 16:13发布

问题:

In Spark's documentation, it says that RDDs method reduce requires a associative AND commutative binary function.

However, the method reduceByKey ONLY requires an associative binary function.

sc.textFile("file4kB", 4)

I did some tests, and apparently it's the behavior I get. Why this difference? Why does reduceByKey ensure the binary function is always applied in certain order (to accommodate for the lack of commutativity) when reduce does not?

Example, if a load some (small) text with 4 partitions (minimum):

val r = sc.textFile("file4k", 4)

then:

r.reduce(_ + _)

returns a string where parts are not always in the same order, whereas:

r.map(x => (1,x)).reduceByKey(_ + _).first

always returns the same string (where everything is in the same order than in the original file).

(I checked with r.glom and the file content is indeed spread over 4 partitions, there is no empty partition).

回答1:

As far as I am concerned this is an error in the documentation and results you see are simply incidental. Practice, other resources and a simple analysis of the code show that function passed to reduceByKey should be not only associative but commutative as well.

  • practice - while it looks like the order is preserved in a local mode it is no longer true when you run Spark on a cluster, including standalone mode.

  • other resources - to quote Data Exploration Using Spark from AmpCamp 3:

    There is a convenient method called reduceByKey in Spark for exactly this pattern. Note that the second argument to reduceByKey determines the number of reducers to use. By default, Spark assumes that the reduce function is commutative and associative and applies combiners on the mapper side.

  • code - reduceByKey is implemented using combineByKeyWithClassTag and creates ShuffledRDD. Since Spark doesn't guarantee the order after shuffling the only way to restore it would be to attach some metadata to the partially reduced records. As far as I can tell nothing like this takes place.

On a side note reduce as it is implemented in PySpark will work just fine with a function which is only commutative. It is of course just a detail of an implementation and not a part of the contract.



回答2:

According to the code documentation, recently updated/corrected. (thanks @zero323) :

reduceByKey merges the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.

So it was in fact actually a documentation error like @zero323 pointed out in his answer.

You can check the following links to the code to make sure :

  • https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L304

  • https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L1560