DataFrame连接优化 - 广播哈希连接

50

我正在尝试有效地将两个DataFrame连接起来,其中一个较大,而另一个稍微小一些。

有没有办法避免所有这些洗牌呢?我不能设置autoBroadCastJoinThreshold,因为它只支持整数 - 而我正在尝试广播的表略大于整数字节。

有没有一种方法强制广播忽略此变量?


感觉你实际上的问题是“有没有一种方法可以强制广播忽略这个变量?” 它可以通过我下面提到的属性进行控制。由于没有人回答,为了使它相关,我给出了这个晚回答。希望能帮到你! - Ram Ghadiyaram
请接受一个答案作为被采纳的答案,这也将成为其他人的指针。谢谢! - Ram Ghadiyaram
请接受一个答案作为被采纳的答案。这也将成为其他人的指针。谢谢! - Ram Ghadiyaram
6个回答

94

广播哈希连接(类似于MapReduce中的map side join或map-side combine):

在SparkSQL中,您可以通过调用queryExecution.executedPlan来查看正在执行的连接类型。与核心Spark一样,如果其中一个表比另一个表小得多,则可能需要使用广播哈希连接。您可以通过在连接之前对DataFrame调用方法broadcast来提示Spark SQL应该将给定的DF广播到连接上。

示例: largedataframe.join(broadcast(smalldataframe), "key")

在DWH术语中,其中largedataframe可能类似于fact
smalldataframe可能类似于dimension

如我最喜欢的书(HPS)所述,请参见下文以了解更好的理解。 enter image description here

请注意:broadcast是从import org.apache.spark.sql.functions.broadcast而不是从SparkContext导入的。

Spark还会自动使用spark.sql.conf.autoBroadcastJoinThreshold来确定是否应该广播表。

提示:请查看DataFrame.explain()方法

def
explain(): Unit
Prints the physical plan to the console for debugging purposes.

有没有一种方法可以强制忽略这个变量进行广播?

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")


注意:

关于Hive(而不是Spark)的另一个类似性质的注释:可以使用Hive提示符MAPJOIN来实现类似的操作,如下所示...

Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key

hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb

进一步阅读:请参考我的关于BHJ、SHJ、SMJ的文章


当我想要执行smallDF.join(broadcast(largeDF, "left_outer"))时,将largeDF.join(broadcast(smallDF), "right_outer")是否有意义?因为小的数据框应该保存在内存中,而不是大的数据框。 - vincwng
请参考SparkStrategies.scala中列举的所有情况。这不是单行写在此处的内容...将left_outer更改为right_outer会导致结果发生变化。 - Ram Ghadiyaram
但在正常情况下,Table1的LEFT OUTER JOIN Table2和Table2的RIGHT OUTER JOIN Table1是相等的。 - Ram Ghadiyaram

23

您可以使用left.join(broadcast(right),...)提示数据框进行广播。


2
这个广播的正确导入方式是什么?我发现 broadcast 这个符号无法解析。 - Chris A.
2
它位于org.apache.spark.sql.functions下,您需要使用Spark 1.5.0或更高版本。 - Sebastian Piu
3
有没有办法在 SQL 语句中提示广播连接? - Tagar
你确实可以在SQL语句中使用提示,但不确定其有效程度。例如,我已经像left inner join broadcast(right)这样使用过它,但不确定它是否适用于子查询。 - Sebastian Piu
在SparkSQL中进行广播,您可以使用以下方法: val df = broadcast(spark.table("tableA")).createTempView("tableAView") spark.sql("SELECT ... FROM tableAView a JOIN tableB b") - Alexander Tronchin-James
自Spark 2.2.0起,不再支持Hints,请参见此处:https://issues.apache.org/jira/browse/SPARK-16475 - TaylerJones

7

3
这是spark的一个当前限制,详情请参见SPARK-6235。2GB的限制同样适用于广播变量。
您确定没有其他好的方法来解决这个问题吗?例如不同的分区?
否则,您可以通过手动创建多个广播变量来绕过此限制,每个广播变量都小于2GB。

1
我已经成功将一个较小的表格大小缩减到略低于2 GB,但似乎广播仍未发生(autoBroadcast无法选择它)。不过对于小表格(100 MB)来说,它可以正常工作。 - NNamed

1

我发现这段代码适用于Spark 2.11版本2.0.0中的广播连接。

import org.apache.spark.sql.functions.broadcast  

val employeesDF = employeesRDD.toDF
val departmentsDF = departmentsRDD.toDF

// materializing the department data
val tmpDepartments = broadcast(departmentsDF.as("departments"))

import context.implicits._

employeesDF.join(broadcast(tmpDepartments), 
   $"depId" === $"id",  // join by employees.depID == departments.id 
   "inner").show()

以上代码的参考文献为 Henning Kropp博客,使用Spark进行广播连接


0

使用连接提示将优先于配置autoBroadCastJoinThreshold,因此使用提示将始终忽略该阈值。

此外,在使用连接提示时,自适应查询执行(自Spark 3.x以来)也不会更改提示中给定的策略。

Spark SQL中,您可以按如下所示应用连接提示:

SELECT /*+ BROADCAST */ a.id, a.value FROM a JOIN b ON a.id = b.id

SELECT /*+ BROADCASTJOIN */ a.id, a.value FROM a JOIN b ON a.id = b.id

SELECT /*+ MAPJOIN */ a.id, a.value FROM a JOIN b ON a.id = b.id

请注意,关键字BROADCAST、BROADCASTJOIN和MAPJOIN在hints.scala代码中都是别名。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接