Why my BroadcastHashJoin is slower than ShuffledHa

2019-01-09 17:47发布

I execute a join using a javaHiveContext in Spark.

The big table is 1,76Gb and has 100 millions record.

The second table is 273Mb and has 10 millions record.

I get a JavaSchemaRDD and I call count() on it:

String query="select attribute7,count(*) from ft,dt where ft.chiavedt=dt.chiavedt group by attribute7";

JavaSchemaRDD rdd=sqlContext.sql(query);

System.out.println("count="+rdd.count());

If I force a broadcastHashJoin (SET spark.sql.autoBroadcastJoinThreshold=290000000) and use 5 executor on 5 node with 8 core and 20Gb of memory it is executed in 100 sec. If i don't force broadcast it is executed in 30 sec.

N.B. the tables are stored as Parquet file.

1条回答
男人必须洒脱
2楼-- · 2019-01-09 18:24

Most likely the source of the problem is a cost of broadcasting. To make things simple lets assume that you have 1800MB in the larger RDD and 300MB in the smaller one. Assuming 5 executors and no previous partitioning a fifth of all data should be already on the correct machine. It lefts ~1700MB for shuffling in case of standard join.

For broadcast join the smaller RDD has to be transfered to all nodes. It means around 1500MB data to be transfered. If you add required communication with driver it means you have to move a comparable amount of data in a much more expensive way. A broadcasted data has to be collected first and only after that can be forwarded to all the workers.

查看更多
登录 后发表回答