在RDD/Spark DataFrame中,根据特定列删除重复行。

97
假设我有一个相当大的数据集,形式如下:
data = sc.parallelize([('Foo', 41, 'US', 3),
                       ('Foo', 39, 'UK', 1),
                       ('Bar', 57, 'CA', 2),
                       ('Bar', 72, 'CA', 2),
                       ('Baz', 22, 'US', 6),
                       ('Baz', 36, 'US', 6)])

我想根据第一、第三和第四列的值删除重复的行。
完全删除重复行很简单:
data = data.distinct()

要删除第5行或第6行。

但是,如何仅根据第1列、第3列和第4列删除重复行?即删除其中一个:

('Baz', 22, 'US', 6)
('Baz', 36, 'US', 6)

在Python中,可以通过使用.drop_duplicates()来指定列来实现这一点。在Spark/PySpark中如何实现相同的功能?

样本代码是用什么语言编写的?Scala?Python? - undefined
8个回答

139

Pyspark中确实包含了一个dropDuplicates()方法,该方法从1.4版本开始引入。 https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+

>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+

2
有没有一种方法可以捕获它删除的记录? - user422930
2
x = usersDf.drop_duplicates(subset=['DETUserId']) - X 数据框将包含所有已删除的记录 - Rodney
4
@Rodney,这并不是文档上的说法:“返回一个新的DataFrame,其中去除了重复行,可选择只考虑某些列。”https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates - Bas
结果是不确定的,你很可能不想在生产中使用它... - Alessandro S.

28

从您的问题中,我们不清楚您想使用哪些列来确定重复项。解决方案的一般思路是根据识别重复项的列的值创建一个键。然后,您可以使用reduceByKey或reduce操作来消除重复项。

以下是一些代码供您参考:

def get_key(x):
    return "{0}{1}{2}".format(x[0],x[2],x[3])

m = data.map(lambda x: (get_key(x),x))

现在,您有一个由第1、3和4列键控的键值RDD。下一步将是reduceByKeygroupByKeyfilter。这将消除重复项。

r = m.reduceByKey(lambda x,y: (x))

18

我知道你已经接受了其他答案,但如果你想把它作为数据框来处理,只需使用 groupBy 和 agg。假设你已经创建好了一个带有列名称为 "col1"、"col2" 等的 DF,你可以这样做:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")

请注意,在这种情况下,我选择了col2的最大值,但您也可以选择平均值、最小值等。


2
到目前为止,我的DataFrame经验是它们可以使一切更加优雅和更快。 - David Griffin
1
需要注意的是,此答案是用Scala编写的 - 对于pyspark,请将 $"col1" 替换为 col("col1") 等。 - Daniel Arthur

13

在SparkR中是否有相应的函数? - sag

10
我使用了内置函数dropDuplicates()。Scala代码如下所示:
val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")

data.dropDuplicates(Array("x","count")).show()

输出:

+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Foo| 41| US|    3|
|Bar| 57| CA|    2|
+---+---+---+-----+

1
问题明确要求使用pyspark实现,而不是scala。 - vaer-k

5
下面的程序将帮助您删除所有重复项,或者如果您想基于某些列删除重复项,则可以轻松实现:
import org.apache.spark.sql.SparkSession

object DropDuplicates {
def main(args: Array[String]) {
val spark =
  SparkSession.builder()
    .appName("DataFrame-DropDuplicates")
    .master("local[4]")
    .getOrCreate()

import spark.implicits._

// create an RDD of tuples with some data
val custs = Seq(
  (1, "Widget Co", 120000.00, 0.00, "AZ"),
  (2, "Acme Widgets", 410500.00, 500.00, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (4, "Widgets R Us", 410500.00, 0.0, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
  (6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)

// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")

println("*** Here's the whole DataFrame with duplicates")

customerDF.printSchema()

customerDF.show()

// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()

println("*** Now without duplicates")

withoutDuplicates.show()

val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))

println("*** Now without partial duplicates too")

withoutPartials.show()

 }
 }

“// 删除完全相同的行”这个注释第一次是正确的,第二次是错误的。可能是复制/粘贴错误? - Joshua Stafford
1
感谢@JoshuaStafford,已删除不当评论。 - Sampat Kumar

0

所有上述方法都很好,我认为dropduplicates是最好的方法。

以下是另一种方法(使用group by agg等)在不使用dropduplicates的情况下去重, 但如果你注意时间/性能,按列dropduplicates是最佳选择(时间:1563毫秒)。

以下是完整列表和时间。

import org.apache.spark.sql.SparkSession

object DropDups {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ReadFromUrl")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._
    spark.sparkContext.setLogLevel("Error")
    val data = sc.parallelize(List(
      ("Foo", 41, "US", 3),
      ("Foo", 39, "UK", 1),
      ("Bar", 57, "CA", 2),
      ("Bar", 72, "CA", 2),
      ("Baz", 22, "US", 6),
      ("Baz", 36, "US", 6)
    )).toDF("x", "y", "z", "count")

    spark.time
    {
      import org.apache.spark.sql.functions.first
      val data = sc.parallelize(List(
        ("Foo", 41, "US", 3),
        ("Foo", 39, "UK", 1),
        ("Bar", 57, "CA", 2),
        ("Bar", 72, "CA", 2),
        ("Baz", 22, "US", 6),
        ("Baz", 36, "US", 6)
      )).toDF("x", "y", "z", "count")

      val deduped = data
        .groupBy("x", "count")
        .agg(
          first("y").as("y"),
          first("z").as("z")
        )
      deduped.show()
    }
    spark.time {
      data.dropDuplicates(Array("x","count")).show()
    }
    spark.stop()
  }
}
  

结果:

 +---+-----+---+---+
|  x|count|  y|  z|
+---+-----+---+---+
|Baz|    6| 22| US|
|Foo|    1| 39| UK|
|Bar|    2| 57| CA|
|Foo|    3| 41| US|
+---+-----+---+---+

Time taken: 7086 ms
+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Bar| 57| CA|    2|
|Foo| 41| US|    3|
+---+---+---+-----+

Time taken: 1563 ms


-4

这是我的Df,包含4重复出现了两次,因此在这里将删除重复的值。

scala> df.show
+-----+
|value|
+-----+
|    1|
|    4|
|    3|
|    5|
|    4|
|   18|
+-----+

scala> val newdf=df.dropDuplicates

scala> newdf.show
+-----+
|value|
+-----+
|    1|
|    3|
|    5|
|    4|
|   18|
+-----+

你可以在 spark-shell 中检查,我已经分享了正确的输出。这个答案涉及到如何在列或数据框中去除重复值。 - Nilesh Shinde
你能提供一个基于OP问题的例子吗? - Alex
我在我的回答中已经给出了例子,你可以参考那个。 - Nilesh Shinde
你的帖子对这个讨论没有任何价值。@vaerek已经发布了一个PySpark df.dropDuplicates()示例,包括如何将其应用于多个列(我的初始问题)。 - Jason
这个句子无法理解。请添加缺失的标点、缺失的单词等。提前感谢。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接