Broadcast not happening while joining dataframes i

2019-01-19 21:09发布

问题:

Below is the sample code that I am running. when this spark job runs, Dataframe joins are happening using sortmergejoin instead of broadcastjoin.

def joinedDf (sqlContext: SQLContext,
              txnTable:   DataFrame,
              countriesDfBroadcast: Broadcast[DataFrame]): 
              DataFrame = {
                    txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
                    $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
              }
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")  

The broadcastjoin is not happening even when I specify a broadcast() hint in the join statement.

The optimizer is hashpartitioning the dataframe and it is causing data skew.

Has anyone seen this behavior?

I am running this on yarn using Spark 1.6 and HiveContext as SQLContext. The spark job runs on 200 executors. and the data size of the txnTable is 240GB and the datasize of countriesDf is 5mb.

回答1:

Both the way how you broadcast DataFrame and how you access it are incorrect.

  • Standard broadcasts cannot be used to handle distributed data structures. If you want to perform broadcast join on a DataFrame you should use broadcast functions which marks given DataFrame for broadcasting:

    import org.apache.spark.sql.functions.broadcast
    
    val countriesDf: DataFrame = ???
    val tmp: DataFrame = broadcast(
      countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries")
    ) 
    
    txnTable.as("df1").join(
      broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
    

    Internally it will collect tmp without converting from internal and broadcast afterwards.

  • join arguments are eagerly evaluated. Even it was possible to use SparkContext.broadcast with distributed data structure broadcast value is evaluated locally before join is called. Thats' why your function work at all but doesn't perform broadcast join.