In Spark's documentation, it says that RDDs method reduce
requires a associative AND commutative binary function.
However, the method reduceByKey
ONLY requires an associative binary function.
sc.textFile("file4kB", 4)
I did some tests, and apparently it's the behavior I get. Why this difference? Why does reduceByKey
ensure the binary function is always applied in certain order (to accommodate for the lack of commutativity) when reduce
does not?
Example, if a load some (small) text with 4 partitions (minimum):
val r = sc.textFile("file4k", 4)
then:
r.reduce(_ + _)
returns a string where parts are not always in the same order, whereas:
r.map(x => (1,x)).reduceByKey(_ + _).first
always returns the same string (where everything is in the same order than in the original file).
(I checked with r.glom
and the file content is indeed spread over 4 partitions, there is no empty partition).
As far as I am concerned this is an error in the documentation and results you see are simply incidental. Practice, other resources and a simple analysis of the code show that function passed to reduceByKey
should be not only associative but commutative as well.
practice - while it looks like the order is preserved in a local mode it is no longer true when you run Spark on a cluster, including standalone mode.
other resources - to quote Data Exploration Using Spark from AmpCamp 3:
There is a convenient method called reduceByKey in Spark for exactly this pattern. Note that the second argument to reduceByKey determines the number of reducers to use. By default, Spark assumes that the reduce function is commutative and associative and applies combiners on the mapper side.
code - reduceByKey
is implemented using combineByKeyWithClassTag
and creates ShuffledRDD
. Since Spark doesn't guarantee the order after shuffling the only way to restore it would be to attach some metadata to the partially reduced records. As far as I can tell nothing like this takes place.
On a side note reduce
as it is implemented in PySpark will work just fine with a function which is only commutative. It is of course just a detail of an implementation and not a part of the contract.
According to the code documentation, recently updated/corrected. (thanks @zero323) :
reduceByKey
merges the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
So it was in fact actually a documentation error like @zero323 pointed out in his answer.
You can check the following links to the code to make sure :