星火SQL广播哈希联接星火SQL广播哈希联接(Spark SQL broadcast hash jo

2019-05-12 06:12发布

我试图使用SparkSQL dataframes执行广播哈希联接如下记载: https://docs.cloud.databricks.com/docs/latest/databricks_guide/06%20Spark%20SQL%20%26%20DataFrames/05% 20BroadcastHashJoin%20-%20scala.html

在这个例子中,(小) DataFrame通过saveAsTable依然存在,再有就是通过火花SQL联接(即通过sqlContext.sql("..."))

我的问题是,我需要使用sparkSQL API来构建我的SQL(我左侧的接合部〜50桌与ID列表,并且不希望手工编写的SQL)。

How do I tell spark to use the broadcast hash join via the API?  The issue is that if I load the ID list (from the table persisted via `saveAsTable`) into a `DataFrame` to use in the join, it isn't clear to me if Spark can apply the broadcast hash join.

Answer 1:

你可以明确地标记DataFrame的足够小,使用广播broadcast功能:

Python的

from pyspark.sql.functions import broadcast

small_df = ...
large_df = ...

large_df.join(broadcast(small_df), ["foo"])

或广播的提示(火花> = 2.2):

large_df.join(small_df.hint("broadcast"), ["foo"])

斯卡拉

import org.apache.spark.sql.functions.broadcast

val smallDF: DataFrame = ???
val largeDF: DataFrame = ???

largeDF.join(broadcast(smallDF), Seq("foo"))

或广播的提示(火花> = 2.2):

largeDF.join(smallDF.hint("broadcast"), Seq("foo"))

SQL

您可以使用提示( 星火> = 2.2 ):

SELECT /*+ MAPJOIN(small) */ * 
FROM large JOIN small
ON large.foo = small.foo

要么

SELECT /*+  BROADCASTJOIN(small) */ * 
FROM large JOIN small
ON large.foo = small.foo

要么

SELECT /*+ BROADCAST(small) */ * 
FROM large JOIN small
ON larger.foo = small.foo

R(SparkR):

hint (火花> = 2.2):

join(large, hint(small, "broadcast"), large$foo == small$foo)

broadcast (火花> = 2.3)

join(large, broadcast(small), large$foo == small$foo)

注意

广播加入是有用的,如果结构中的一个相对较小。 否则,它可以显著不是一个完整的洗牌更加昂贵。



Answer 2:

jon_rdd = sqlContext.sql( "select * from people_in_india  p
                            join states s
                            on p.state = s.name")


jon_rdd.toDebugString() / join_rdd.explain() : 

shuffledHashJoin:
所有印度的数据将被洗牌成只有29对每个国家的钥匙。 问题:分片不均匀。 有限的并行与29个的输出分区。

broadcaseHashJoin:

广播小RDD为所有的工作节点。 大RDD的并行性依然维持和洗牌甚至不要求。

PS:图像可能丑陋,但内容丰富。



Answer 3:

随着广播加盟加盟等式的一边被物化和发送到所有映射器。 因此,它被认为是一个地图边加入。

随着数据集越来越物化,并通过网络发送它,如果它相当小唯带来显著的性能提升。

所以,如果你试图执行smallDF.join(largeDF)

等待..!!! 另一个制约因素是,它也需要完全融入每个executor.It的内存也需要适应驾驶员的记忆!

广播变量是使用洪流协议iePeer对等协议和洪流协议的优点执行人之间共享是不依赖于中央实体保持所有块彼此之间的文件的对等体共享块。

上面提到的例子是足以开始广播再现加盟。

注: 创建后不能修改值。 如果你尝试,变化将只在一个节点&



文章来源: Spark SQL broadcast hash join