Spark窗口聚合与Group By/Join性能比较

9

我对运行聚合函数在窗口上的性能特征感兴趣,与group by/join相比。在这种情况下,我只关注作为运行聚合函数的一种方式,而不是具有自定义帧边界或排序的窗口函数。

请注意,我只对批处理(非流)性能感兴趣,对于相当大量的数据,因此我已禁用广播连接(join)。

例如,假设我们从以下DataFrame开始:

val df = Seq(("bob", 10), ("sally", 32), ("mike", 9), ("bob", 18)).toDF("name", "age")
df.show(false)

+-----+---+
|name |age|
+-----+---+
|bob  |10 |
|sally|32 |
|mike |9  |
|bob  |18 |
+-----+---+

假设我们想要计算每个姓名出现的次数,并在与该姓名匹配的行上提供该计数。

分组/连接(Group By/Join)

val joinResult = df.join(
    df.groupBy($"name").count,
    Seq("name"),
    "inner"
)
joinResult.show(false)

+-----+---+-----+
|name |age|count|
+-----+---+-----+
|sally|32 |1    |
|mike |9  |1    |
|bob  |18 |2    |
|bob  |10 |2    |
+-----+---+-----+

joinResult.explain
== Physical Plan ==
*(4) Project [name#5, age#6, count#12L]
+- *(4) SortMergeJoin [name#5], [name#15], Inner
   :- *(1) Sort [name#5 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(name#5, 200)
   :     +- LocalTableScan [name#5, age#6]
   +- *(3) Sort [name#15 ASC NULLS FIRST], false, 0
      +- *(3) HashAggregate(keys=[name#15], functions=[count(1)])
         +- Exchange hashpartitioning(name#15, 200)
            +- *(2) HashAggregate(keys=[name#15], functions=[partial_count(1)])
               +- LocalTableScan [name#15]

窗口

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{functions => f}

val windowResult = df.withColumn("count", f.count($"*").over(Window.partitionBy($"name")))
windowResult.show(false)

+-----+---+-----+
|name |age|count|
+-----+---+-----+
|sally|32 |1    |
|mike |9  |1    |
|bob  |10 |2    |
|bob  |18 |2    |
+-----+---+-----+

windowResult.explain
== Physical Plan ==
Window [count(1) windowspecdefinition(name#5, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count#34L], [name#5]
+- *(1) Sort [name#5 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(name#5, 200)
      +- LocalTableScan [name#5, age#6]

根据执行计划,窗口函数似乎更高效(少了几个阶段)。那么我的问题是,这总是这种情况吗?我是否应该始终使用窗口函数进行此类聚合?两种方法在数据增长时是否会以类似的方式扩展?对于极端倾斜的情况呢(即某些名称比其他名称更常见)?

2个回答

23
这取决于数据。更具体地说,它取决于“名称”列的基数。如果基数很小,则在聚合后数据量也很小,并且聚合结果可以在连接中广播。在这种情况下,连接将比窗口更快。另一方面,如果基数很大且在聚合后数据很大,则使用SortMergeJoin进行计划的连接,使用窗口将更为高效。
对于窗口的情况,我们有1个总洗牌+一个排序。对于SortMergeJoin的情况,我们在左分支中有相同的操作(总共洗牌+排序),加上右分支中的附加缩减洗牌和排序(我的意思是先聚合数据)。在连接的右分支中,我们还需要额外扫描数据。
此外,您可以查看我在Spark Summit上分析类似示例的视频

谢谢 - 我只对聚合输出过大无法进行广播连接的情况感兴趣。 - user1302130
根据groupby/join的物理计划,Spark会读取本地计划两次吗?第一次获取聚合结果,第二次在连接阶段还是它将重复使用df而不进行缓存? - zonna

3
禁用广播并使用定时方法生成一百万和两百万个随机生成的名称来生成数据,即体量较大。在一个Databricks社区集群上使用8、8、200个分区大小,方案2的执行时间似乎确实更好。所生成的计划通过窗口的排序和计数具有智能性,正如您所说,阶段更少。这似乎是关键因素。规模上来看,您可以有更多分区,但证据使我倾向于采用方法2。
引用:
我尝试了一些随机样本的名称(忽略年龄),结果如下:
- 对于一百万条记录,使用join在48.361秒内生成,使用窗口在22.028秒内生成。 - 对于两百万条记录,使用join在85.814秒内生成,在重启后使用窗口在50.566秒内生成。 - 对于两百万条记录,使用join在96.295秒内生成,使用窗口在43.875秒内生成。
使用的代码:
import scala.collection.mutable.ListBuffer
import scala.util.Random
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{functions => f}

val alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
val size = alpha.size
def randStr(n:Int) = (1 to n).map(_ => alpha(Random.nextInt(size))).mkString

def timeIt[T](op: => T): Float = {
  val start = System.currentTimeMillis
  val res = op
  val end = System.currentTimeMillis
  (end - start) / 1000f
}

var names = new ListBuffer[String]()
for (i <- 1 to 2000000 ) {
    names += randStr(10)     
}
val namesList = names.toSeq
val df = namesList.toDF("name")

val joinResult = df.join(df.groupBy($"name").count, Seq("name"), "inner")
val windowResult = df.withColumn("count", f.count($"*").over(Window.partitionBy($"name")))
val time1 = timeIt(joinResult.count)
val time2 = timeIt(windowResult.count)

println(s"join in $time1 seconds vs $time2 seconds for window")

此外,该问题展示了Spark优化器仍然不够成熟。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接