Spark窗口聚合函数在记录排序方面的工作不太直观。

3

我有以下示例,正在Spark 3.3上运行。

import pyspark.sql.functions as F
from pyspark.sql import Window

inputData = [
  ("1", 333),
  ("1", 222),
  ("1", 111),
  ("2", 334)
]
inputDf = spark.createDataFrame(inputData, schema=["id", "val"])

window = Window.partitionBy("id")
aggregatedDf = (
    inputDf.withColumn("min_val", F.min(F.col("val")).over(window))
    .withColumn("max_val", F.max(F.col("val")).over(window))
).show()

输出结果符合预期,每个窗口的最小/最大值都是正确的。

+---+---+-------+-------+
| id|val|min_val|max_val|
+---+---+-------+-------+
|  1|333|    111|    333|
|  1|222|    111|    333|
|  1|111|    111|    333|
|  2|334|    334|    334|
+---+---+-------+-------+

当我在窗口中添加orderBy时,输出结果不同:

window = Window.partitionBy("id").orderBy(F.col("val").desc())


+---+---+-------+-------+
| id|val|min_val|max_val|
+---+---+-------+-------+
|  1|333|    333|    333|
|  1|222|    222|    333|
|  1|111|    111|    333|
|  2|334|    334|    334|
+---+---+-------+-------+

正如您所看到的,使用desc排序max_value很好,但min_value从记录到记录不断变化。

我尝试在文档或这里的SO中找到更多信息,但没有运气。对我来说,这一点根本不直观。

我的期望是Spark将扫描给定分区中的所有记录,并为分区内的每个记录分配最小/最大值,这在窗口内没有排序时是正确的,但在添加排序时会有所不同。

有人知道为什么会这样工作吗?

1个回答

3

您需要添加框架,才能得到您期望的输出。

根据文档

请注意,默认情况下不定义排序时,会使用无限制的窗口范围(rowFrame, 无限制前置, 无限制后继)。当定义排序时,默认使用递增的窗口范围(rangeFrame, 无限制前置, 当前行)。

基本上,Spark或任何SQL在处理该行函数时,默认情况下将考虑窗口直到当前行。通过添加框架-unboundedPreceding到unboundedFollowing-我们要求Spark考虑整个窗口。

例如,在以降序方式按值排序的数据帧中处理第二行的min函数时,Spark将考虑id = 1的第一行和第二行(在unboundedPreceding和CURRENT_ROW之间)的窗口

这将起作用

window = Window.partitionBy("id")\
.orderBy(F.col("val").desc())\
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

输出:

Out

如果想更深入了解框架,请阅读https://docs.oracle.com/cd/E17952_01/mysql-8.0-en/window-functions-frames.html

https://medium.com/expedia-group-tech/deep-dive-into-apache-spark-window-functions-7b4e39ad3c86


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接