Why do I get so many empty partitions when reparti

2020-06-23 09:16发布

问题:

I want to partition a dataframe "df1" on 3 columns. This dataframe has exactly 990 unique combinaisons for those 3 columns:

In [17]: df1.createOrReplaceTempView("df1_view")

In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+                                                                      
|count(1)|
+--------+
|     990|
+--------+

In order to optimize the processing of this dataframe, I want to partition df1 in order to get 990 partitions, one for each key possibility:

In [19]: df1.rdd.getNumPartitions()
Out[19]: 24

In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")

In [21]: df2.rdd.getNumPartitions()
Out[21]: 990

I wrote a simple way to count rows in each partition:

In [22]: def f(iterator):
    ...:     a = 0
    ...:     for partition in iterator:
    ...:         a = a + 1
    ...:     print(a)
    ...: 

In [23]: df2.foreachPartition(f)

And I notice that what I get in fact is 628 partitions with one or more key values, and 362 empty partitions.

I assumed spark would repartition in an even way (1 key value = 1 partition) but that does not seem like it, and I feel like this repartitionning is adding data skew even though it should be the other way around...

What's the algorithm Spark uses to partition a dataframe on columns ? Is there a way to achieve what I thought was possible ?

I'm using Spark 2.2.0 on Cloudera.

回答1:

To distribute data across partitions spark needs somehow to convert value of the column to index of the partition. There are two default partitioners in Spark - HashPartitioner and RangePartitioner. Different transformations in Spark can apply different partitioners - e.g. join will apply hash partitioner.

Basically for hash partitioner formula to convert value to partition index would be value.hashCode() % numOfPartitions. In your case multiple values are mapping to same partition index.

You could implement your own partitioner if you want better distribution. More about it is here and here and here.