I am seeing some performance issues while running queries using dataframes. I have seen in my research, that long running finally tasks can be a sign that data is not disturbed optimally, but have not found a detailed process for resolving this issue.
I am starting off loading two tables as dataframes, and I am then joining those tables on one field. I have tried to add distribute by(repartition), and sort by, in order to improve the performance, but am still seeing this single long running final task. Here is a simple version of my code, note that query one and two are not actually this simple and use UDFs to calculate some values.
I have tried a few different settings for spark.sql.shuffle
. I have tried 100, but it failed(I didn't really debug this to much to be honest). I tried 300, 4000, and 8000. Performance decreased with each increase. I am selecting a single day of data, where each file is an hour.
val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")
val distributeDf1 = df1
.repartition(df1("userId"))
.sortWithinPartitions(df1("userId"))
val distributeDf2 = df2
.repartition(df2("userId"))
.sortWithinPartitions(df2("userId"))
distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")
val df3 = sqlContext
.sql("""
Select
df1.*
from
df1
left outer join df2 on
df1.userId = df2.userId""")
Since it seems partitioning by userId is not ideal, I could partition by the timestamp instead. If I do this, should I just do the Date + Hour? If I have less then 200 unique combos for this, will I have empty executors?