I have a dataframe which has 500 partitions and is shuffled. I want to repartition it based on one column say 'city' But the city column is extremely skewed as it has only three possible values. So when I repartition based on column city, even if I specify 500 number of partitions, only three are getting data. Because of this I am running into performance issues. I searched on internet but could not find any suitable solution. Is there a way to repartition the dataframe uniformly across partitions based in city column. What I need is: city1 goes to say first 5 partitions, city2 goes to next 490 partitions and city3 goes to remaining 5 partitions.
可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
回答1:
When we've encountered data with known skew, we've used a partitioner that applies controlled randomization for the skewed values. I outline how this can be done in this answer.
回答2:
You can repartition into 500 partitions by specifying 1 or more columns (2 in this case). For example (pyspark):
file_x = "hdfs://mycluster/user/data/x"
df_x = sq.read.format("com.databricks.spark.avro").load(file_x)
print str(datetime.now()) + ": FileX partitions: " + str(df_x.rdd.getNumPartitions())
# repartition based on 2 columns
df_y = df_x.repartition(500, "CITY", "ADDRESS")
print str(datetime.now()) + ": FileY partitions: " + str(df_y.rdd.getNumPartitions())
See docs for more
回答3:
Use DISTRIBUTE BY
clause on the dataframe.
As per your requirement, To deal with the skew, you can repartition your data using distribute by
.
For the expression to partition by, choose something that you know will evenly distribute the data.
df.distributeBy($'<expression>', 30)
In expression
, you randomize the result using some expression like city.toString().length > Randome.nextInt(<avg-city-length>)