如何根据另一个DataFrame的列删除DataFrame中的行?

4

我正在尝试在Spark 1.6.1中使用SQLContext.subtract(),根据另一个数据框架中的列删除数据框架中的行。让我们举个例子:

from pyspark.sql import Row

df1 = sqlContext.createDataFrame([
    Row(name='Alice', age=2),
    Row(name='Bob', age=1),
]).alias('df1')

df2 = sqlContext.createDataFrame([
    Row(name='Bob'),
])

df1_with_df2 = df1.join(df2, 'name').select('df1.*')
df1_without_df2 = df1.subtract(df1_with_df2)

由于我想要从df1中获取除name='Bob'以外的所有行,因此我期望得到Row(age=2,name='Alice')。但是我也检索到了Bob:

print(df1_without_df2.collect())
# [Row(age='1', name='Bob'), Row(age='2', name='Alice')]

经过多次实验,我找到了这个MCVE,发现问题出在age键上。如果我省略它:

df1_noage = sqlContext.createDataFrame([
    Row(name='Alice'),
    Row(name='Bob'),
]).alias('df1_noage')

df1_noage_with_df2 = df1_noage.join(df2, 'name').select('df1_noage.*')
df1_noage_without_df2 = df1_noage.subtract(df1_noage_with_df2)
print(df1_noage_without_df2.collect())
# [Row(name='Alice')]

然后我只得到了预期中的Alice。我做出的最奇怪的观察是,只要它们在连接中使用的键之后(按字典顺序意义上),就可以添加键:

df1_zage = sqlContext.createDataFrame([
    Row(zage=2, name='Alice'),
    Row(zage=1, name='Bob'),
]).alias('df1_zage')

df1_zage_with_df2 = df1_zage.join(df2, 'name').select('df1_zage.*')
df1_zage_without_df2 = df1_zage.subtract(df1_zage_with_df2)
print(df1_zage_without_df2.collect())
# [Row(name='Alice', zage=2)]

我正确地得到了Alice(和她的zage)!在我的实例中,我对所有列感兴趣,而不仅仅是在name之后的那些。

1个回答

2

这里有一些bug(第一个问题看起来与SPARK-6231的问题相关),使用JIRA看起来是个好主意,但是SUBTRACT/EXCEPT不适合于部分匹配。

相反,在Spark 2.0及以上版本中,您可以使用反向连接:

df1.join(df1_with_df2, ["name"], "leftanti").show()

在1.6版本中,您可以使用标准外连接做类似的事情:
import pyspark.sql.functions as F

ref = df1_with_df2.select("name").alias("ref")

(df1
    .join(ref, ref.name == df1.name, "leftouter")
    .filter(F.isnull("ref.name"))
    .drop(F.col("ref.name")))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接