我有以下示例,正在Spark 3.3上运行。
import pyspark.sql.functions as F
from pyspark.sql import Window
inputData = [
("1", 333),
("1", 222),
("1", 111),
("2", 334)
]
inputDf = spark.createDataFrame(inputData, schema=["id", "val"])
window = Window.partitionBy("id")
aggregatedDf = (
inputDf.withColumn("min_val", F.min(F.col("val")).over(window))
.withColumn("max_val", F.max(F.col("val")).over(window))
).show()
输出结果符合预期,每个窗口的最小/最大值都是正确的。
+---+---+-------+-------+
| id|val|min_val|max_val|
+---+---+-------+-------+
| 1|333| 111| 333|
| 1|222| 111| 333|
| 1|111| 111| 333|
| 2|334| 334| 334|
+---+---+-------+-------+
当我在窗口中添加orderBy时,输出结果不同:
window = Window.partitionBy("id").orderBy(F.col("val").desc())
+---+---+-------+-------+
| id|val|min_val|max_val|
+---+---+-------+-------+
| 1|333| 333| 333|
| 1|222| 222| 333|
| 1|111| 111| 333|
| 2|334| 334| 334|
+---+---+-------+-------+
正如您所看到的,使用desc排序max_value很好,但min_value从记录到记录不断变化。
我尝试在文档或这里的SO中找到更多信息,但没有运气。对我来说,这一点根本不直观。
我的期望是Spark将扫描给定分区中的所有记录,并为分区内的每个记录分配最小/最大值,这在窗口内没有排序时是正确的,但在添加排序时会有所不同。
有人知道为什么会这样工作吗?