我对运行聚合函数在窗口上的性能特征感兴趣,与group by/join相比。在这种情况下,我只关注作为运行聚合函数的一种方式,而不是具有自定义帧边界或排序的窗口函数。
请注意,我只对批处理(非流)性能感兴趣,对于相当大量的数据,因此我已禁用广播连接(join)。
例如,假设我们从以下DataFrame开始:
val df = Seq(("bob", 10), ("sally", 32), ("mike", 9), ("bob", 18)).toDF("name", "age")
df.show(false)
+-----+---+
|name |age|
+-----+---+
|bob |10 |
|sally|32 |
|mike |9 |
|bob |18 |
+-----+---+
假设我们想要计算每个姓名出现的次数,并在与该姓名匹配的行上提供该计数。
分组/连接(Group By/Join)
val joinResult = df.join(
df.groupBy($"name").count,
Seq("name"),
"inner"
)
joinResult.show(false)
+-----+---+-----+
|name |age|count|
+-----+---+-----+
|sally|32 |1 |
|mike |9 |1 |
|bob |18 |2 |
|bob |10 |2 |
+-----+---+-----+
joinResult.explain
== Physical Plan ==
*(4) Project [name#5, age#6, count#12L]
+- *(4) SortMergeJoin [name#5], [name#15], Inner
:- *(1) Sort [name#5 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#5, 200)
: +- LocalTableScan [name#5, age#6]
+- *(3) Sort [name#15 ASC NULLS FIRST], false, 0
+- *(3) HashAggregate(keys=[name#15], functions=[count(1)])
+- Exchange hashpartitioning(name#15, 200)
+- *(2) HashAggregate(keys=[name#15], functions=[partial_count(1)])
+- LocalTableScan [name#15]
窗口
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{functions => f}
val windowResult = df.withColumn("count", f.count($"*").over(Window.partitionBy($"name")))
windowResult.show(false)
+-----+---+-----+
|name |age|count|
+-----+---+-----+
|sally|32 |1 |
|mike |9 |1 |
|bob |10 |2 |
|bob |18 |2 |
+-----+---+-----+
windowResult.explain
== Physical Plan ==
Window [count(1) windowspecdefinition(name#5, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count#34L], [name#5]
+- *(1) Sort [name#5 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#5, 200)
+- LocalTableScan [name#5, age#6]
根据执行计划,窗口函数似乎更高效(少了几个阶段)。那么我的问题是,这总是这种情况吗?我是否应该始终使用窗口函数进行此类聚合?两种方法在数据增长时是否会以类似的方式扩展?对于极端倾斜的情况呢(即某些名称比其他名称更常见)?