I am trying to optimise my spark application job.
I tried to understand the points from this question: How to avoid shuffles while joining DataFrames on unique keys?
I have made sure that the keys on which join operation has to happen are distributed within the same partition (using my custom partitioner).
I also cannot do a broadcast join because my data may be come large depending on situation.
In the answer of above mentioned question, repartitioning only optimises the join but What I need is join WITHOUT A SHUFFLE. I am just fine with the join operation with the help of keys within the partition.
Is it possible? I want to implement something like joinperpartition if similar functionality does not exists.
repartitioning only optimises the join but What I need is join WITHOUT A SHUFFLE
This is not true. Repartition does not only "optimize" the join. Repartition binds a Partitioner
to your RDD, which is the key component for a map side join.
I have made sure that the keys on which join operation has to happen are distributed within the same partition
Spark must know about this. Build your DataFrames with the appropriate api's so that they have the same Partitioner
, and spark will take care of the rest.
just an addition to previously good answers.
If you are joining a big dataframe multiple times throughout your pyspark application then save that table as bucketed tables and read them back in pyspark as dataframe. this way you can avoid multiple shuffles during join as data is already pre-shuffled and sorted.
so when Spark chooses sort merge join on two large dataframe, it will skip the sort and shuffle phase during your join operations. (you can confirm it in spark UI while looking at wholecodegen)
df_data_1.coalesce(1).write.format('orc').bucketBy(20, 'joincolumn').sortBy("sortcolumn").mode("overwrite").saveAsTable('bucketed_table1')
df_data_2.coalesce(1).write.format('orc').bucketBy(20, 'joincolumn').sortBy("sortcolumn").mode("overwrite").saveAsTable('bucketed_table2')
df_bucket_table_1 = spark.table("bucketed_table1");
df_bucket_table_2 = spark.table("bucketed_table2");
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.join.preferSortMergeJoin","true")
#creating alias for the dataframes:
from pyspark.sql.functions import *
df1 = df_bucket_table_1.alias('df1')
df2 = df_bucket_table_2.alias('df2')
DfInnerJoin = df1.join(df2, df1.joincolumn == df2.joincolumn,'inner').select('df1.*')
The above join will have no shuffling but this is useful only when you have to join same dataframe multiple times throughout the application.