如何在Spark中实现“交叉连接”?

14

我们计划将Apache Pig代码迁移到新的Spark平台。

Pig有“Bag / Tuple / Field”概念,类似于关系型数据库。 Pig提供了对CROSS / INNER / OUTER联接的支持。

对于CROSS JOIN,我们可以使用alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n];

但是,随着我们迁移到Spark平台,我找不到Spark API中的相应功能。 您有什么想法吗?


它还没有准备好,但是目前正在构建spork(基于spark的pig),因此您可能不需要更改任何代码。 - aaronman
2个回答

25

这是oneRDD.cartesian(anotherRDD)


谢谢,笛卡尔积是交叉连接的别名。 - Shawn Guo

4
以下是针对Spark 2.x数据集和数据框的推荐版本:
scala> val ds1 = spark.range(10)
ds1: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> ds1.cache.count
res1: Long = 10

scala> val ds2 = spark.range(10)
ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> ds2.cache.count
res2: Long = 10

scala> val crossDS1DS2 = ds1.crossJoin(ds2)
crossDS1DS2: org.apache.spark.sql.DataFrame = [id: bigint, id: bigint]

scala> crossDS1DS2.count
res3: Long = 100

或者使用传统的JOIN语法而不需要JOIN条件。使用这个配置选项可以避免后面出现的错误。

spark.conf.set("spark.sql.crossJoin.enabled", true)

当省略该配置时出现错误(特别是使用“join”语法时):
scala> val crossDS1DS2 = ds1.join(ds2)
crossDS1DS2: org.apache.spark.sql.DataFrame = [id: bigint, id: bigint]

scala> crossDS1DS2.count
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
...
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

相关:Spark 2.x的spark.sql.crossJoin.enabled

(该参数用于启用或禁用Spark SQL中的跨连接操作。)

当您执行数据集联接时,结果会生成一个DataFrame,但我希望它应该是另一个数据集...为什么不使用joinWith呢? - Dan Ciborowski - MSFT
好眼力,丹!这个例子只是为了说明交叉连接语义,所以使用joinWith来获取一个数据集并不是首要考虑的。我会更新答案,但你的问题开启了另一条关于crossJoin方法返回DF而不是DS的探究线路,让用户使用joinWith和配置选项来保持他们的DS,嗯。 - Garren S
在我看来,如果要使用joinWith并进行交叉连接,您必须使用两个相互矛盾的语句将其联合到整个数据集中,我想这是为了确保您确实想要执行交叉连接。 - Dan Ciborowski - MSFT

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接