Spark自定义聚合:collect_list+UDF vs UDAF

13

我经常需要在Spark 2.1的数据框中执行自定义聚合,并使用以下两种方法:

  • 使用groupby/collect_list将所有值获取到单个行中,然后应用UDF来聚合这些值
  • 编写自定义UDA(用户定义的聚合函数)

一般情况下,我更喜欢第一种选项,因为它比UDAF实现更容易实现和更易读。但是我认为第一种选项通常会更慢,因为会发送更多数据到网络上(没有部分聚合),但我的经验表明UDAF通常很慢。为什么呢?

具体示例:计算直方图

数据存储在Hive表中(1E6个随机双精度值)

val df = spark.table("testtable")

def roundToMultiple(d:Double,multiple:Double) = Math.round(d/multiple)*multiple

UDF方法:

val udf_histo = udf((xs:Seq[Double]) => xs.groupBy(x => roundToMultiple(x,0.25)).mapValues(_.size))

df.groupBy().agg(collect_list($"x").as("xs")).select(udf_histo($"xs")).show(false)

+--------------------------------------------------------------------------------+
|UDF(xs)                                                                         |
+--------------------------------------------------------------------------------+
|Map(0.0 -> 125122, 1.0 -> 124772, 0.75 -> 250819, 0.5 -> 248696, 0.25 -> 250591)|
+--------------------------------------------------------------------------------+

UDAF方法

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

import scala.collection.mutable

class HistoUDAF(binWidth:Double) extends UserDefinedAggregateFunction {

  override def inputSchema: StructType =
    StructType(
      StructField("value", DoubleType) :: Nil
    )

  override def bufferSchema: StructType =
    new StructType()
      .add("histo", MapType(DoubleType, IntegerType))

  override def deterministic: Boolean = true
  override def dataType: DataType = MapType(DoubleType, IntegerType)
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = Map[Double, Int]()
  }
  
  private def mergeMaps(a: Map[Double, Int], b: Map[Double, Int]) = {
    a ++ b.map { case (k,v) => k -> (v + a.getOrElse(k, 0)) }
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
     val oldBuffer = buffer.getAs[Map[Double, Int]](0)
     val newInput = Map(roundToMultiple(input.getDouble(0),binWidth) -> 1) 
     buffer(0) = mergeMaps(oldBuffer, newInput)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val a = buffer1.getAs[Map[Double, Int]](0)
    val b = buffer2.getAs[Map[Double, Int]](0)
    buffer1(0) = mergeMaps(a, b)
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[Map[Double, Int]](0)
  }
}

val histo = new HistoUDAF(0.25)

df.groupBy().agg(histo($"x")).show(false)

+--------------------------------------------------------------------------------+
|histoudaf(x)                                                                    |
+--------------------------------------------------------------------------------+
|Map(0.0 -> 125122, 1.0 -> 124772, 0.75 -> 250819, 0.5 -> 248696, 0.25 -> 250591)|
+--------------------------------------------------------------------------------+

我的测试结果表明,collect_list/UDF方法比UDAF方法快大约2倍。这是一般规律,还是在某些情况下UDAF确实更快,而且笨拙的实现方式也是合理的呢?


你能找出这个原因吗? - Sudev Ambadi
8
你可能正在问错问题了。collect_list会将所有内容汇聚到一个executor中。所以应该问的问题是是否有可能出现问题。如果有可能出现问题,你应该使用udaf。如果没有collect_list出问题的风险,则可以使用udf+collect_list。 - Robert Beatty
1
@RobertBeatty 您的评论可能会被误解,"everything" 指的是一个组中的所有记录。因此,在许多组和少量偏差(因此没有非常大的组)的情况下,collect_list 方法是可行的。 - Raphael Roth
2
@RaphaelRoth,你说得对。它是按组计算的。但我的主要观点是collect_list+udf会将数据一次性全部加载到内存中,每个组都是如此。这就是为什么它的性能要好得多的原因。但也是为什么它很危险的原因。你必须确信其中一个组不会导致内存耗尽。在许多情况下,这并不是微不足道的。UDAFs对于大多数用例来说更好,因为它们更安全。它们不像udfs那样是黑匣子,并且不会将大量数据加载到内存中。如果这是你工作的主要部分,学习编写UDAFs是非常值得的。 - Robert Beatty
UDAFs有时也可以少量洗牌,因为它们在每个分区上创建一个每组预聚合,然后对这些预聚合结果进行洗牌和合并(使用merge方法)。较少的洗牌数据可以抵消内存性能。 - Lior Regev
1个回答

2

UDAF较慢,因为它在每次更新即每行数据中从/到内部缓冲区反序列化/序列化聚合器,这相当昂贵(更多细节)。相反,您应该使用Aggregator(实际上,自Spark 3.0以来,UDAF已被弃用)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接