Spark SQL 广播哈希连接

16
我正在尝试使用SparkSQL在数据框之间执行广播哈希连接,如此处所述:https://docs.cloud.databricks.com/docs/latest/databricks_guide/06%20Spark%20SQL%20%26%20DataFrames/05%20BroadcastHashJoin%20-%20scala.html。在该示例中,(小)DataFrame通过saveAsTable进行持久化,然后通过spark SQL进行连接(即通过sqlContext.sql("...")))。我遇到的问题是我需要使用sparkSQL API来构建我的SQL(我要将约50个表与ID列表左连接,不想手写SQL)。
How do I tell spark to use the broadcast hash join via the API?  The issue is that if I load the ID list (from the table persisted via `saveAsTable`) into a `DataFrame` to use in the join, it isn't clear to me if Spark can apply the broadcast hash join.
3个回答

48
您可以使用broadcast函数将DataFrame明确标记为足够小以进行广播:

Python

from pyspark.sql.functions import broadcast

small_df = ...
large_df = ...

large_df.join(broadcast(small_df), ["foo"])

或广播提示(Spark >= 2.2):

large_df.join(small_df.hint("broadcast"), ["foo"])

Scala:

import org.apache.spark.sql.functions.broadcast

val smallDF: DataFrame = ???
val largeDF: DataFrame = ???

largeDF.join(broadcast(smallDF), Seq("foo"))

或广播提示(Spark >= 2.2):
largeDF.join(smallDF.hint("broadcast"), Seq("foo"))

SQL

您可以使用提示 (Spark >= 2.2):

SELECT /*+ MAPJOIN(small) */ * 
FROM large JOIN small
ON large.foo = small.foo

或者

SELECT /*+  BROADCASTJOIN(small) */ * 
FROM large JOIN small
ON large.foo = small.foo

或者

SELECT /*+ BROADCAST(small) */ * 
FROM large JOIN small
ON larger.foo = small.foo

R (SparkR):

使用 hint (Spark >= 2.2):

join(large, hint(small, "broadcast"), large$foo == small$foo)

使用broadcast (Spark >= 2.3)

join(large, broadcast(small), large$foo == small$foo)

注意:

如果其中一个结构相对较小,则广播连接非常有用。否则,它可能比完整洗牌显着更加昂贵。


3
谢谢!经过一些尝试,我发现 smallDF.join(largeDF) 并没有使用广播哈希连接,但 largeDF.join(smallDF) 是这样的。 - user1759848
那么,对于小的数据框和大的数据框,执行 smallDF.join(LargeDF, "right_outer") 有意义吗? - vincwng
@vincwng smallDF.join(LargeDF, "right_outer") 可以根据 spark.sql.autoBroadcastJoinThreshold 自动进行广播,而 broadcast 函数可以应用于任何位置:broadcast(largeDF).join(smallDF, Seq("foo")) - zero323
在使用 SQL API 处理两个小表和一个大表时,是否有办法传递广播提示?谢谢! - knowledge_seeker

6
jon_rdd = sqlContext.sql( "select * from people_in_india  p
                            join states s
                            on p.state = s.name")


jon_rdd.toDebugString() / join_rdd.explain() : 

shuffledHashJoin :
所有印度的数据将被分配到每个州仅有的29个键中。 问题: 不均匀分片。 只有29个输出分区,并行性受限。

broadcastHashJoin:

向所有工作节点广播小RDD。 大RDD的并行性仍得以保持且无需洗牌。进入图像描述

PS:图片可能不美观但信息量大。


3

使用广播连接时,加入等式的一边被实现并发送到所有映射器。因此,这被认为是一个映射边连接。

随着数据集被实现并通过网络发送,它只会带来显著的性能提升,如果数据集相对较小。

所以,如果您尝试执行 smallDF.join(largeDF)

等一下..!!! 另一个限制是它还需要完全适合每个executor的内存中。它也需要适合驱动程序的内存!

广播变量使用Torrent协议即点对点协议在executors之间共享,Torrent协议的优点是对等方彼此共享文件块,而不依赖于中央实体持有所有块。

上述示例足以开始使用广播连接。

注意:创建后不能修改值。如果尝试更改,更改仅在一个节点上进行


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接