对一个Spark DataFrame列进行按日期排序的数组

5

我有一个如下所示的数据框:

+---+------------------------------------------------------+
|Id |DateInfos                                             |
+---+------------------------------------------------------+
|B  |[[3, 19/06/2012-02.42.01], [4, 17/06/2012-18.22.21]]  |
|A  |[[1, 15/06/2012-18.22.16], [2, 15/06/2012-09.22.35]]  |
|C  |[[5, 14/06/2012-05.20.01]]                            |
+---+------------------------------------------------------+

我想按照数组第二个元素中的时间戳,对DateInfos列中的每个元素进行日期排序。
+---+------------------------------------------------------+
|Id |DateInfos                                             |
+---+------------------------------------------------------+
|B  |[[4, 17/06/2012-18.22.21], [3, 19/06/2012-02.42.01]]  |
|A  |[[2, 15/06/2012-09.22.35], [1, 15/06/2012-18.22.16]]  |
|C  |[[5, 14/06/2012-05.20.01]]                            |
+---+------------------------------------------------------+

我的DataFrame的结构如下所示:

root
 |-- C1: string (nullable = true)
 |-- C2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = false)

我假设我需要创建一个udf,使用以下签名的函数:

def sort_by_date(mouvements : Array[Any]) : Array[Any]

你有任何想法吗?

1个回答

8

这确实有点棘手——因为虽然UDF的输入和输出类型看起来相同,但我们不能真正地定义它——因为输入实际上是一个mutable.WrappedArray[Row],而输出不能使用Row,否则Spark将无法将其解码成一个 Row...

所以我们定义一个接受mutable.WrappedArray[Row]并返回Array[(Int, String)]的UDF:

val sortDates = udf { arr: mutable.WrappedArray[Row] =>
  arr.map { case Row(i: Int, s: String) => (i, s) }.sortBy(_._2)
}

val result = input.select($"Id", sortDates($"DateInfos") as "DateInfos")

result.show(truncate = false)
// +---+--------------------------------------------------+
// |Id |DateInfos                                         |
// +---+--------------------------------------------------+
// |B  |[[4,17/06/2012-18.22.21], [3,19/06/2012-02.42.01]]|
// |A  |[[2,15/06/2012-09.22.35], [1,15/06/2012-18.22.16]]|
// |C  |[[5,14/06/2012-05.20.01]]                         |
// +---+--------------------------------------------------+

在排序的方向上添加一个减号,例如 arr.map { case Row(i: Int, s: String) => (i, s) }.sortBy(-_._2),以反转排序的方向。 - Shrikant Prabhu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接