Why the Spark's repartition didn't balance

2020-02-29 11:34发布

问题:

>>> rdd = sc.parallelize(range(10), 2)
>>> rdd.glom().collect()
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
>>> rdd.repartition(3).glom().collect()
[[], [0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
>>>

The first partition is empty? Why? I really appreciate you telling me the reasons.

回答1:

That happens because Spark doesn't shuffle individual elements but rather blocks of data - with minimum batch size equal to 10.

So if you have less elements than that per partition, Spark won't separate content of partitions.



回答2:

This can be explained by just taking a look at how the repartition function works. The reason for this is that calling df.repartition(COL, numPartitions=k) will create a dataframe with k partitions using a hash-based partitioned. Pyspark will go through every row and apply the following function to determine where the element in the current row will end :

partition_the_row_belongs_to = hash(COL) % k

The k in this case is used for mapping the rows into a space consisting of k partitions. As you can see, there are times where hash functions collides. Sometimes some partitions will be empty whist others have too many elements. This can be because of hash map conclusions, or because of the hash function. Either way the reason for what you are seeing is that repartition created 3 partitions as you asked for, it doesn't promise you anything about balancing the partitions or having all partitions non-empty. If you want to have more control on how the resulting partitions look like, look at partitionby.

See also: this question and this question.

I hope that helps.



回答3:

It is worth while noting that as Spark is all about running at scale, this is an unlikely scenario to worry about. The closest you can get is skewed data. range will give different initial partitioning than repartition which will use hashing. Also the comment on batch size is valid, but less relevant in practice.