How to repartition a dataframe in Spark scala on a

2019-08-02 19:47发布

问题:

I have a dataframe which has 500 partitions and is shuffled. I want to repartition it based on one column say 'city' But the city column is extremely skewed as it has only three possible values. So when I repartition​ based on column city, even if I specify 500 number of partitions, only three are getting data. Because of this I am running into performance issues. I searched on internet but could not find any suitable solution. Is there a way to repartition the dataframe uniformly across partitions based in city column. What I need is: city1 goes to say first 5 partitions, city2 goes to next 490 partitions and city3 goes to remaining 5 partitions.

回答1:

When we've encountered data with known skew, we've used a partitioner that applies controlled randomization for the skewed values. I outline how this can be done in this answer.



回答2:

You can repartition into 500 partitions by specifying 1 or more columns (2 in this case). For example (pyspark):

file_x = "hdfs://mycluster/user/data/x"
df_x = sq.read.format("com.databricks.spark.avro").load(file_x)

print str(datetime.now()) + ": FileX partitions: " + str(df_x.rdd.getNumPartitions())

# repartition based on 2 columns
df_y = df_x.repartition(500, "CITY", "ADDRESS")

print str(datetime.now()) + ": FileY partitions: " + str(df_y.rdd.getNumPartitions())

See docs for more



回答3:

Use DISTRIBUTE BY clause on the dataframe.

As per your requirement, To deal with the skew, you can repartition your data using distribute by.

For the expression to partition by, choose something that you know will evenly distribute the data.

df.distributeBy($'<expression>', 30)

In expression, you randomize the result using some expression like city.toString().length > Randome.nextInt(<avg-city-length>)