Below is the sample code that I am running. when this spark job runs, Dataframe joins are happening using sortmergejoin instead of broadcastjoin.
def joinedDf (sqlContext: SQLContext,
txnTable: DataFrame,
countriesDfBroadcast: Broadcast[DataFrame]):
DataFrame = {
txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
$"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
}
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")
The broadcastjoin is not happening even when I specify a broadcast() hint in the join statement.
The optimizer is hashpartitioning the dataframe and it is causing data skew.
Has anyone seen this behavior?
I am running this on yarn using Spark 1.6 and HiveContext as SQLContext. The spark job runs on 200 executors. and the data size of the txnTable is 240GB and the datasize of countriesDf is 5mb.
Both the way how you broadcast
DataFrame
and how you access it are incorrect.Standard broadcasts cannot be used to handle distributed data structures. If you want to perform broadcast join on a
DataFrame
you should usebroadcast
functions which marks givenDataFrame
for broadcasting:Internally it will
collect
tmp
without converting from internal and broadcast afterwards.join arguments are eagerly evaluated. Even it was possible to use
SparkContext.broadcast
with distributed data structure broadcast value is evaluated locally beforejoin
is called. Thats' why your function work at all but doesn't perform broadcast join.