我想按以下方式加入数据:
rdd1 = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['idx', 'val'])
rdd2 = spark.createDataFrame([(1, 2, 1), (1, 3, 0), (2, 3, 1)], ['key1', 'key2', 'val'])
res1 = rdd1.join(rdd2, on=[rdd1['idx'] == rdd2['key1']])
res2 = res1.join(rdd1, on=[res1['key2'] == rdd1['idx']])
res2.show()
然后我遇到了一些错误:
pyspark.sql.utils.AnalysisException: u'笛卡尔积可能会非常昂贵,因此默认情况下禁用。要明确启用它们,请设置spark.sql.crossJoin.enabled = true;'
但我认为这不是一个笛卡尔积。
更新:
res2.explain()
== Physical Plan ==
CartesianProduct
:- *SortMergeJoin [idx#0L, idx#0L], [key1#5L, key2#6L], Inner
: :- *Sort [idx#0L ASC, idx#0L ASC], false, 0
: : +- Exchange hashpartitioning(idx#0L, idx#0L, 200)
: : +- *Filter isnotnull(idx#0L)
: : +- Scan ExistingRDD[idx#0L,val#1]
: +- *Sort [key1#5L ASC, key2#6L ASC], false, 0
: +- Exchange hashpartitioning(key1#5L, key2#6L, 200)
: +- *Filter ((isnotnull(key2#6L) && (key2#6L = key1#5L)) && isnotnull(key1#5L))
: +- Scan ExistingRDD[key1#5L,key2#6L,val#7L]
+- Scan ExistingRDD[idx#40L,val#41]
df
派生出来的df1
和df2
,并且所有三个都共享col
,那么df1.col op df2.col
可能会被解析为显然是真或假,即使它在技术上(根据实际解析规则)并不是这样。 - zero323sparkSession.sql("your sql")
执行来避免使用基于数据框架的DSL而完全避免这个问题。 - nir