为什么Spark认为这是一个交叉/笛卡尔积连接

20

我想按以下方式加入数据:

rdd1 = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['idx', 'val'])
rdd2 = spark.createDataFrame([(1, 2, 1), (1, 3, 0), (2, 3, 1)], ['key1', 'key2', 'val'])

res1 = rdd1.join(rdd2, on=[rdd1['idx'] == rdd2['key1']])
res2 = res1.join(rdd1, on=[res1['key2'] == rdd1['idx']])
res2.show()

然后我遇到了一些错误:

pyspark.sql.utils.AnalysisException: u'笛卡尔积可能会非常昂贵,因此默认情况下禁用。要明确启用它们,请设置spark.sql.crossJoin.enabled = true;'

但我认为这不是一个笛卡尔积。

更新:

res2.explain()

== Physical Plan ==
CartesianProduct
:- *SortMergeJoin [idx#0L, idx#0L], [key1#5L, key2#6L], Inner
:  :- *Sort [idx#0L ASC, idx#0L ASC], false, 0
:  :  +- Exchange hashpartitioning(idx#0L, idx#0L, 200)
:  :     +- *Filter isnotnull(idx#0L)
:  :        +- Scan ExistingRDD[idx#0L,val#1]
:  +- *Sort [key1#5L ASC, key2#6L ASC], false, 0
:     +- Exchange hashpartitioning(key1#5L, key2#6L, 200)
:        +- *Filter ((isnotnull(key2#6L) && (key2#6L = key1#5L)) && isnotnull(key1#5L))
:           +- Scan ExistingRDD[key1#5L,key2#6L,val#7L]
+- Scan ExistingRDD[idx#40L,val#41]
3个回答

20

这是因为您将共享相同血统的结构体 join在一起,从而导致一个平凡相等的条件:

res2.explain()

== Physical Plan ==
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Join Inner, ((idx#204L = key1#209L) && (key2#210L = idx#204L))
:- Filter isnotnull(idx#204L)
:  +- LogicalRDD [idx#204L, val#205]
+- Filter ((isnotnull(key2#210L) && (key2#210L = key1#209L)) && isnotnull(key1#209L))
   +- LogicalRDD [key1#209L, key2#210L, val#211L]
and
LogicalRDD [idx#235L, val#236]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

In case like this you should use aliases:

from pyspark.sql.functions import col

rdd1 = spark.createDataFrame(...).alias('rdd1')
rdd2 = spark.createDataFrame(...).alias('rdd2')

res1 = rdd1.join(rdd2, col('rdd1.idx') == col('rdd2.key1')).alias('res1')
res1.join(rdd1, on=col('res1.key2') == col('rdd1.idx')).explain()
== Physical Plan ==
*SortMergeJoin [key2#297L], [idx#360L], Inner
:- *Sort [key2#297L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(key2#297L, 200)
:     +- *SortMergeJoin [idx#290L], [key1#296L], Inner
:        :- *Sort [idx#290L ASC NULLS FIRST], false, 0
:        :  +- Exchange hashpartitioning(idx#290L, 200)
:        :     +- *Filter isnotnull(idx#290L)
:        :        +- Scan ExistingRDD[idx#290L,val#291]
:        +- *Sort [key1#296L ASC NULLS FIRST], false, 0
:           +- Exchange hashpartitioning(key1#296L, 200)
:              +- *Filter (isnotnull(key2#297L) && isnotnull(key1#296L))
:                 +- Scan ExistingRDD[key1#296L,key2#297L,val#298L]
+- *Sort [idx#360L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(idx#360L, 200)
      +- *Filter isnotnull(idx#360L)
         +- Scan ExistingRDD[idx#360L,val#361]

For details see SPARK-6459.


2
@user6910411 如果您所说的“同一血统”是指Spark DataFrame的惰性求值和查询计划器存在问题。那么OP的查询从SQL角度来看不是笛卡尔积,对吧? - nir
4
@nir 你可以这么说。简而言之,如果你有从df派生出来的df1df2,并且所有三个都共享col,那么df1.col op df2.col可能会被解析为显然是真或假,即使它在技术上(根据实际解析规则)并不是这样。 - zero323
1
我发现,可以通过使用实际的SQL并通过sparkSession.sql("your sql")执行来避免使用基于数据框架的DSL而完全避免这个问题。 - nir

6

我在第二次连接之前将数据框持续化后,也获得了成功。

类似这样:

res1 = rdd1.join(rdd2, col('rdd1.idx') == col('rdd2.key1')).persist()

res1.join(rdd1, on=col('res1.key2') == col('rdd1.idx'))

2
@GergeSzekely,这对我也起作用了,但我不知道为什么。有什么区别吗? - data princess

4

持久化对我来说没有起作用。

我通过在数据框上使用别名来克服了这个问题。

from pyspark.sql.functions import col

df1.alias("buildings").join(df2.alias("managers"), col("managers.distinguishedName") == col("buildings.manager"))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接