使用Pyspark根据另一个数据框的列筛选数据框。

47

我不确定为什么这个问题对我来说很困难,因为在R或pandas中做起来非常简单。但是我想避免使用pandas,因为我正在处理大量数据,并且我相信toPandas()会将所有数据加载到pyspark驱动程序的内存中。

我有两个数据框:df1df2。我希望过滤df1(删除所有行),其中df1.userid = df2.userid并且df1.group = df2.group。我不确定是否应该使用filter()join()sql。例如:

df1:
+------+----------+--------------------+
|userid|   group  |      all_picks     |
+------+----------+--------------------+
|   348|         2|[225, 2235, 2225]   |
|   567|         1|[1110, 1150]        |
|   595|         1|[1150, 1150, 1150]  |
|   580|         2|[2240, 2225]        |
|   448|         1|[1130]              |
+------+----------+--------------------+

df2:
+------+----------+---------+
|userid|   group  |   pick  |
+------+----------+---------+
|   348|         2|     2270|
|   595|         1|     2125|
+------+----------+---------+

Result I want:
+------+----------+--------------------+
|userid|   group  |      all_picks     |
+------+----------+--------------------+
|   567|         1|[1110, 1150]        |
|   580|         2|[2240, 2225]        |
|   448|         1|[1130]              |
+------+----------+--------------------+

编辑: 我尝试了许多join()和filter()函数,我相信最接近的是:

cond = [df1.userid == df2.userid, df2.group == df2.group]
df1.join(df2, cond, 'left_outer').select(df1.userid, df1.group, df1.all_picks) # Result has 7 rows

我尝试了许多不同的连接类型,也尝试了不同的

cond values:
    cond = ((df1.userid == df2.userid) & (df2.group == df2.group)) # result has 7 rows
    cond = ((df1.userid != df2.userid) & (df2.group != df2.group)) # result has 2 rows

然而,现在情况似乎是连接操作正在新增行,而非删除。

我使用的是python 2.7spark 2.1.0

1个回答

97

左反连接就是您要查找的内容:

df1.join(df2, ["userid", "group"], "leftanti")

但是可以使用左外连接完成相同的操作:

(df1
    .join(df2, ["userid", "group"], "leftouter")
    .where(df2["pick"].isNull())
    .drop(df2["pick"]))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接