PySpark - 获取重复行的索引

7

假设我有一个PySpark数据框,如下所示:

+--+--+--+--+
|a |b |c |d |
+--+--+--+--+
|1 |0 |1 |2 |
|0 |2 |0 |1 |
|1 |0 |1 |2 |
|0 |4 |3 |1 |
+--+--+--+--+

如何创建一个标记所有重复行的列,如下所示:
+--+--+--+--+--+
|a |b |c |d |e |
+--+--+--+--+--+
|1 |0 |1 |2 |1 |
|0 |2 |0 |1 |0 |
|1 |0 |1 |2 |1 |
|0 |4 |3 |1 |0 |
+--+--+--+--+--+

我尝试使用groupBy和aggregate函数,但没有成功。


2
不是完全相同,但这个答案是一种方法。尝试:df.groupBy(df.columns).count().show() - pault
5个回答

14

只是为了扩展一下我的评论:

您可以按所有列分组,并使用pyspark.sql.functions.count()来确定某一列是否重复:

import pyspark.sql.functions as f
df.groupBy(df.columns).agg((f.count("*")>1).cast("int").alias("e")).show()
#+---+---+---+---+---+
#|  a|  b|  c|  d|  e|
#+---+---+---+---+---+
#|  1|  0|  1|  2|  1|
#|  0|  2|  0|  1|  0|
#|  0|  4|  3|  1|  0|
#+---+---+---+---+---+

在这里,我们使用count("*") > 1作为聚合函数,并将结果转换为intgroupBy()将导致删除重复行。 根据您的需求,这可能已经足够了。

但是,如果您想保留所有行,可以使用其他答案中显示的Window函数或使用join()

df.join(
    df.groupBy(df.columns).agg((f.count("*")>1).cast("int").alias("e")),
    on=df.columns,
    how="inner"
).show()
#+---+---+---+---+---+
#|  a|  b|  c|  d|  e|
#+---+---+---+---+---+
#|  1|  0|  1|  2|  1|
#|  1|  0|  1|  2|  1|
#|  0|  2|  0|  1|  0|
#|  0|  4|  3|  1|  0|
#+---+---+---+---+---+

在这里,我们将原始数据框与上面groupBy()的结果数据框按所有列进行内部连接。


8

定义一个window函数,用于检查按所有列分组时的行数是否大于1。如果是,则为重复项(1),否则不是重复项(0)。

allColumns = df.columns
import sys
from pyspark.sql import functions as f
from pyspark.sql import window as w
windowSpec = w.Window.partitionBy(allColumns).rowsBetween(-sys.maxint, sys.maxint)

df.withColumn('e', f.when(f.count(f.col('d')).over(windowSpec) > 1, f.lit(1)).otherwise(f.lit(0))).show(truncate=False) 

这应该给你

+---+---+---+---+---+
|a  |b  |c  |d  |e  |
+---+---+---+---+---+
|1  |0  |1  |2  |1  |
|1  |0  |1  |2  |1  |
|0  |2  |0  |1  |0  |
|0  |4  |3  |1  |0  |
+---+---+---+---+---+

我希望答案有所帮助。 更新 正如 @pault 评论 的所述,您可以通过将 boolean 强制转换为 integer 来消除 whencollit
df.withColumn('e', (f.count('*').over(windowSpec) > 1).cast('int')).show(truncate=False)

2
这里不需要使用whencollit- 你可以将条件转换为整数:df.withColumn('e', (f.count('*').over(windowSpec) > 1).cast('int')).show(truncate=False) - pault
1
谢谢@pault的评论 :) 真的很有帮助,我已经把你的评论包含在我的答案中了。 - Ramesh Maharjan

2

我认为pandas_udf可以更容易地处理这个问题。首先,您需要创建一个pandas UDF,它接受一个Series并对重复行返回True。然后,只需使用withColumn标记重复行即可。以下是我建议的代码:

@pandas_udf('boolean')
def duplicate_finder(s: pd.Series) -> pd.Series:
    return s.duplicated(keep=False)

df.withColumn('Duplicated', duplicate_finder('DESIRED_COLUMN')).show()

1

使用所有列对数据框进行分区,然后应用dense_rank函数。

import sys
from pyspark.sql.functions import dense_rank
from pyspark.sql import window as w

df.withColumn('e', dense_rank().over(w.Window.partitionBy(df.columns))).show()

1
AnalysisException: u'Window function dense_rank() requires window to be ordered - pault

0

df1 = df_interr.groupBy("Item_group", "Item_name", "price").count().filter("count > 1")


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接