在Spark数组列中删除重复项

8

I have a given DataSet :

+-------------------+--------------------+
|               date|            products|
+-------------------+--------------------+
|2017-08-31 22:00:00|[361, 361, 361, 3...|
|2017-09-22 22:00:00|[361, 362, 362, 3...|
|2017-09-21 22:00:00|[361, 361, 361, 3...|
|2017-09-28 22:00:00|[360, 361, 361, 3...|

其中,products列是一个包含可能重复项的字符串数组。

我想要删除这些重复项(在同一行内)。

我的做法基本上是编写了一个UDF函数,如下:

 val removeDuplicates: WrappedArray[String] => WrappedArray[String] = _.distinct
 val udfremoveDuplicates = udf(removeDuplicates)

这个解决方案给我带来了正确的结果:
+-------------------+--------------------+--------------------+
|               date|            products|       rm_duplicates|
+-------------------+--------------------+--------------------+
|2017-08-31 22:00:00|[361, 361, 361, 3...|[361, 362, 363, 3...|
|2017-09-22 22:00:00|[361, 362, 362, 3...|[361, 362, 363, 3...|

我的问题是:

  1. Spark提供了更好/更有效的方法来获取此结果吗?

  2. 我正在考虑使用map,但如何将所需列作为列表获取,以便像我的removeDuplicates lambda中一样使用“distinct”方法?

编辑:我标记了这个主题的Java标签,因为对我来说无论在哪种语言(Scala或Java)中获得答案都没有关系:) 编辑2:错字


2
Spark没有提供这种类型操作的内置函数,所以像@user6910411所说的那样,UDF是解决问题的方法。如果你想要一个List,你可以在distinct后面添加.toList,并更新你的UDF类型注释以返回一个List。 - eliasah
1
映射比数组更昂贵且应避免使用,除非真正需要,例如频繁检查元素的存在性以及元素集合的平均大小相当大时(或需要合并映射等)。即使在这种情况下,如果您需要检查元素的存在性,则通常最快的方法是将元素表示为分隔符字符串,例如 ":123:345:126:" ,并搜索 <delimiter><element><delimiter> 的子字符串。即使是数组等复杂数据结构也需要比字符串处理更多的处理。 - Sim
4个回答

4

3

在这个问题中,使用UDF的方法是最好的方法,因为spark-sql没有内置原语来使数组唯一化。

如果你正在处理大量数据和/或数组值具有唯一属性,则值得考虑UDF的实现。

WrappedArray.distinct在幕后构建了一个mutable.HashSet,然后遍历它以构建不同元素的数组。从性能角度来看,这可能存在两个问题:

  1. Scala的可变集合效率并不高,这就是为什么在Spark的核心中你会发现很多Java集合和while循环。如果你需要极致的性能,可以使用更快的数据结构实现自己的通用distinct。

  2. 通用实现的distinct不能利用数据的任何属性。例如,如果平均数组很小,那么直接构建到数组中并对重复项进行线性搜索的简单实现可能比构建复杂数据结构的代码表现更好,尽管理论上它的复杂度是O(n^2)。另一个例子,如果值只能是来自小范围的数字或字符串集,可以通过位集实现唯一性。

同样,这些策略仅在你有大量数据的情况下才应考虑。对于几乎所有情况,你的简单实现都很合适。


1

您可以使用一个简单的UDF。

val dedup = udf((colName: scala.collection.mutable.WrappedArray[String]) => colName.distinct)
    
df.withColumn("DeDupColumn", dedup($"colName"))

0

假设你当前的数据框架 模式如下所示:

root
 |-- date: string (nullable = true)
 |-- products: array (nullable = true)
 |    |-- element: integer (containsNull = false)

您可以使用以下方法来删除重复项。
df.map(row => DuplicateRemoved(row(0).toString, row(1).asInstanceOf[mutable.WrappedArray[Int]], row(1).asInstanceOf[mutable.WrappedArray[Int]].distinct)).toDF()

当然,你需要一个case class来实现这个。
case class DuplicateRemoved(date: String, products: mutable.WrappedArray[Int], rm_duplicates: mutable.WrappedArray[Int])

你应该得到以下输出。
+-------------------+------------------------------+-------------------------+
|date               |products                      |rm_duplicates            |
+-------------------+------------------------------+-------------------------+
|2017-08-31 22:00:00|[361, 361, 361, 362, 363, 364]|[361, 362, 363, 364]     |
|2017-09-22 22:00:00|[361, 362, 362, 362, 363, 364]|[361, 362, 363, 364]     |
|2017-09-21 22:00:00|[361, 361, 361, 362, 363, 364]|[361, 362, 363, 364]     |
|2017-09-28 22:00:00|[360, 361, 361, 362, 363, 364]|[360, 361, 362, 363, 364]|
+-------------------+------------------------------+-------------------------+

我希望答案对你有帮助

1
UDF方法更简单、更快、更好。这个答案涉及到不安全的Row访问和不必要的case class创建,然后再将其转换回Row。它处理整行数据,这会阻止Spark执行列优化或计划重写,如果这是一个更大的转换DAG的一部分的话。(另外,顺便说一下,问题中提到需要去重的数组包含字符串;可能需要修复一下。) - Sim
是的 @Sim,你完全正确。原帖想看看使用UDF函数完成同一任务的其他可能性,这就是我发布此答案的原因,以展示它是可能的。 :) - Ramesh Maharjan
1
确切的问题是:“Spark是否提供了更好/更有效的方法来获得这个结果?”在软件中解决问题有无数种比给定解决方案更糟糕和低效的方法。 :) - Sim
我同意@Sim Ramesh的观点,这不是一个好的解决方案,而且OP要求更有效率的方法。很抱歉,我会给你的答案点个踩。 - eliasah

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接