PySpark 动态连接条件

3

我有一列主键。因为每个表的主键数量可能会改变,所以我存储了主键。我想要基于 pk_list 中的列将两个数据帧连接起来。

pk_list=['col1',col2', .... 'coln']

现在我的代码看起来是这样的:

full_load_tbl_nc = full_load_tbl.join(delta_load_tbl, (col(f) == col(s) for (f,s) in zip(pk_list,pk_list) ) , "leftanti")

当我运行代码时,出现以下错误:

使用 import from pyspark.sql.functions import col 将列表转换为pyspark.sql.column也会导致col(pk_list)失败。 文件 "/mnt/yarn/usercache/root/appcache/application_1544185829274_0001/container_1544185829274_0001_01_000001/pyspark.zip/pyspark/sql/dataframe.py" 的第818行,join函数抛出AssertionError:on应该是Column或Column列表。


这个问题需要一个 [MCVE]。 - undefined
2
joinpyspark 中接受一个列列表作为参数来进行连接。你不能只简单地执行 full_load_tbl.join(delta_load_tbl, pk_list, how="leftanti") 吗? - undefined
我之前不知道这个事实,它会和列表中的所有列一起工作。我测试过了,它确实有效。非常感谢! - undefined
1个回答

1
你需要传递一个键值列表以进行连接:尝试以下代码。
DF1_Columns = ['col1',col2']
DF2_Columns = ['Col11', 'Col22']
result = DF1.join(DF2, ([col(f) == col(s) for (f,s) in zip(DF1_Columns ,DF2_Columns )]) , "type")

如果两个数据框的关键列名称相似,您可以按如下方式编写相同的连接语句:
result= DF1.join(DF2, ([col(column) == col(column) for column in DF1_Columns ]) , "type")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接