In Spark's documentation, it says that RDDs method reduce
requires a associative AND commutative binary function.
However, the method reduceByKey
ONLY requires an associative binary function.
sc.textFile("file4kB", 4)
I did some tests, and apparently it's the behavior I get. Why this difference? Why does reduceByKey
ensure the binary function is always applied in certain order (to accommodate for the lack of commutativity) when reduce
does not?
Example, if a load some (small) text with 4 partitions (minimum):
val r = sc.textFile("file4k", 4)
then:
r.reduce(_ + _)
returns a string where parts are not always in the same order, whereas:
r.map(x => (1,x)).reduceByKey(_ + _).first
always returns the same string (where everything is in the same order than in the original file).
(I checked with r.glom
and the file content is indeed spread over 4 partitions, there is no empty partition).
As far as I am concerned this is an error in the documentation and results you see are simply incidental. Practice, other resources and a simple analysis of the code show that function passed to
reduceByKey
should be not only associative but commutative as well.practice - while it looks like the order is preserved in a local mode it is no longer true when you run Spark on a cluster, including standalone mode.
other resources - to quote Data Exploration Using Spark from AmpCamp 3:
code -
reduceByKey
is implemented usingcombineByKeyWithClassTag
and createsShuffledRDD
. Since Spark doesn't guarantee the order after shuffling the only way to restore it would be to attach some metadata to the partially reduced records. As far as I can tell nothing like this takes place.On a side note
reduce
as it is implemented in PySpark will work just fine with a function which is only commutative. It is of course just a detail of an implementation and not a part of the contract.According to the code documentation, recently updated/corrected. (thanks @zero323) :
So it was in fact actually a documentation error like @zero323 pointed out in his answer.
You can check the following links to the code to make sure :
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L304
https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L1560