使用Spark Dataframe对结果进行分组和排序,并将结果转化为列表。

18
我有一个Spark Dataframe,想通过键值对元素进行分组,并将结果作为排序后的列表返回。
目前我正在使用:
```df.groupBy("columnA").agg(collect_list("columnB"))```
如何使列表中的项目按升序排序?

2
可能是如何在Spark中不使用Spark SQL对数据框进行排序?的重复问题。 - Alberto Bonsanto
2个回答

32
你可以尝试使用“sort_array”函数,该函数可在functions包中找到。
import org.apache.spark.sql.functions._
df.groupBy("columnA").agg(sort_array(collect_list("columnB")))

28
如何根据同一数据框中不同的列对 collect_list() 中的元素进行排序? - vdep
2
@vdep 请查看另一个答案 - GoodDok

20

想要给Daniel de Paulasort_array解决方案添加另一个提示。

如果您想根据不同的列对元素进行排序,可以形成一个有两个字段的结构体:

  • 按照排序的字段
  • 结果字段

由于结构体是逐个字段排序的,所以您将获得所需的顺序,所需做的就是在结果列表中的每个元素中摆脱按照排序的列。
当需要多个按照排序的列时,可以采用相同的方法。

这是一个示例,可以在本地的spark-shell中运行(使用:paste模式):

import org.apache.spark.sql.Row
import spark.implicits._

case class Employee(name: String, department: String, salary: Double)

val employees = Seq(
  Employee("JSMITH", "A", 20.0),
  Employee("AJOHNSON", "A", 650.0),
  Employee("CBAKER", "A", 650.2),
  Employee("TGREEN", "A", 13.0),
  Employee("CHORTON", "B", 111.0),
  Employee("AIVANOV", "B", 233.0),
  Employee("VSMIRNOV", "B", 11.0)
)

val employeesDF = spark.createDataFrame(employees)

val getNames = udf { salaryNames: Seq[Row] =>
  salaryNames.map { case Row(_: Double, name: String) => name }
}

employeesDF
  .groupBy($"department")
  .agg(collect_list(struct($"salary", $"name")).as("salaryNames"))
  .withColumn("namesSortedBySalary", getNames(sort_array($"salaryNames", asc = false)))
  .show(truncate = false)

结果:

+----------+--------------------------------------------------------------------+----------------------------------+
|department|salaryNames                                                         |namesSortedBySalary               |
+----------+--------------------------------------------------------------------+----------------------------------+
|B         |[[111.0, CHORTON], [233.0, AIVANOV], [11.0, VSMIRNOV]]              |[AIVANOV, CHORTON, VSMIRNOV]      |
|A         |[[20.0, JSMITH], [650.0, AJOHNSON], [650.2, CBAKER], [13.0, TGREEN]]|[CBAKER, AJOHNSON, JSMITH, TGREEN]|
+----------+--------------------------------------------------------------------+----------------------------------+

1
我认为你可以避免使用UDF。 expr(“transform(array_col,x - > x.name)”) - Wassim Maaoui
这里有一个类似的解决方案,它使用结构体来避免需要UDF。也是用pyspark编写的。 - bsauce

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接