Spark join *without* shuffle

2020-07-25 23:15发布

问题:

I am trying to optimise my spark application job.

I tried to understand the points from this question: How to avoid shuffles while joining DataFrames on unique keys?

  1. I have made sure that the keys on which join operation has to happen are distributed within the same partition (using my custom partitioner).

  2. I also cannot do a broadcast join because my data may be come large depending on situation.

  3. In the answer of above mentioned question, repartitioning only optimises the join but What I need is join WITHOUT A SHUFFLE. I am just fine with the join operation with the help of keys within the partition.

Is it possible? I want to implement something like joinperpartition if similar functionality does not exists.

回答1:

repartitioning only optimises the join but What I need is join WITHOUT A SHUFFLE

This is not true. Repartition does not only "optimize" the join. Repartition binds a Partitioner to your RDD, which is the key component for a map side join.

I have made sure that the keys on which join operation has to happen are distributed within the same partition

Spark must know about this. Build your DataFrames with the appropriate api's so that they have the same Partitioner, and spark will take care of the rest.



回答2:

just an addition to previously good answers. If you are joining a big dataframe multiple times throughout your pyspark application then save that table as bucketed tables and read them back in pyspark as dataframe. this way you can avoid multiple shuffles during join as data is already pre-shuffled and sorted.

so when Spark chooses sort merge join on two large dataframe, it will skip the sort and shuffle phase during your join operations. (you can confirm it in spark UI while looking at wholecodegen)

df_data_1.coalesce(1).write.format('orc').bucketBy(20, 'joincolumn').sortBy("sortcolumn").mode("overwrite").saveAsTable('bucketed_table1')

df_data_2.coalesce(1).write.format('orc').bucketBy(20, 'joincolumn').sortBy("sortcolumn").mode("overwrite").saveAsTable('bucketed_table2')

df_bucket_table_1 = spark.table("bucketed_table1");
df_bucket_table_2 = spark.table("bucketed_table2");

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.join.preferSortMergeJoin","true")

 #creating alias for the dataframes:
from pyspark.sql.functions import *

df1 = df_bucket_table_1.alias('df1')
df2 = df_bucket_table_2.alias('df2')


DfInnerJoin = df1.join(df2, df1.joincolumn == df2.joincolumn,'inner').select('df1.*')

The above join will have no shuffling but this is useful only when you have to join same dataframe multiple times throughout the application.