Spark UDAF使用ArrayType作为bufferSchema存在性能问题

5

我正在开发一个返回元素数组的UDAF。

每次更新的输入为索引和值的元组。

UDAF的功能是将相同索引下的所有值求和。

例如:

对于输入(index,value):(2,1),(3,1),(2,3)

应该返回(0,0,4,1,...,0)

逻辑是正确的,但我在update方法中有问题,我的实现只能每行更新一个单元格,但该方法中的最后一次赋值实际上复制了整个数组——这是冗余、极其耗时的。

仅此一项就占据了98%的查询执行时间

我的问题是,我如何减少这个时间?有没有可能在不替换整个缓冲区的情况下分配1个值?

注:我正在使用Spark 1.6,并且不能立即升级它,请提供适用于此版本的解决方案。

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{

  val bucketSize = 1000

  def inputSchema: StructType =  StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)

  def dataType: DataType = ArrayType(LongType)

  def deterministic: Boolean = true

  def bufferSchema: StructType = {
    StructType(
      StructField("buckets", ArrayType(LongType)) :: Nil  
    )
  }

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new Array[Long](bucketSize)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val index = input.getLong(0)
    val value = input.getLong(1)

    val arr = buffer.getAs[mutable.WrappedArray[Long]](0)

    buffer(0) = arr   // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
  }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
    val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)

    for(i <- arr1.indices){
      arr1.update(i, arr1(i) + arr2(i))
    }

    buffer1(0) = arr1
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[mutable.WrappedArray[Long]](0)
  }
}

1
到目前为止,我找到了这个链接,它可能有助于更好地解释事情。我希望能有更好的解决方案。 - LiranBo
1个回答

13

TL;DR 要么不使用UDAF,要么在ArrayType的位置上使用原始类型。

不使用UserDefinedFunction

这两种解决方案都应该避免在内部和外部表示之间进行昂贵的切换。

使用标准聚合和pivot

这使用标准SQL聚合。尽管在内部进行了优化,但当键的数量和数组的大小增长时,它可能会很昂贵。

给定输入:

val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")

您可以:

import org.apache.spark.sql.functions.{array, coalesce, col, lit}

val nBuckets = 10
@transient val values = array(
  0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)

df
  .groupBy("id")
  .pivot("index", 0 until nBuckets)
  .sum("value")
  .select($"id", values.alias("values"))
+---+--------------------+                                                      
| id|              values|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

使用 combineByKey / aggregateByKey 的 RDD API。

使用可变缓冲区的普通的 byKey 聚合。没有特殊花哨的功能,但适用于各种输入并表现良好。如果您怀疑输入为稀疏数据,则可以考虑使用更高效的中间表示,如可变的 Map

rdd
  .aggregateByKey(Array.fill(nBuckets)(0L))(
    { case (acc, (index, value)) => { acc(index) += value; acc }},
    (acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
  ).toDF
+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

如何在使用基本类型时使用UserDefinedFunction

据我所知,性能瓶颈在于ArrayConverter.toCatalystImpl

看起来它是在每个调用MutableAggregationBuffer.update时被调用的,而且每次为Row分配新的GenericArrayData

如果我们将bufferSchema重新定义为:

def bufferSchema: StructType = {
  StructType(
    0 to nBuckets map (i => StructField(s"x$i", LongType))
  )
}

无论是update还是merge,都可以被表达为在缓冲区中对原始值进行简单的替换。调用链会保持相当长,但它不需要复制/转换和疯狂的分配。省略null检查,您需要类似于:

val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))
and
for(i <- 0 to nBuckets){
  buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}
分别。最后,evaluate 应该接受 Row 并将其转换为输出的 Seq:
 for (i <- 0 to nBuckets)  yield buffer.getLong(i)
请注意,在这个实现中,一个可能成为瓶颈的地方是merge。虽然它不应该引入任何新的性能问题,但是对于M 个桶,每次调用merge 的时间复杂度为O(M)
K个唯一键和P个分区的情况下,最坏情况下将被调用M * K次,其中每个键在每个分区中至少出现一次。这将有效地将merge组件的复杂度增加到O(M * N * K)
一般来说,你无法做太多关于它。但是,如果你对数据分布做出具体的假设(数据是稀疏的,键分布是均匀的),你可以简化一些东西,并首先进行洗牌:
df
  .repartition(n, $"key")
  .groupBy($"key")
  .agg(SumArrayAtIndexUDAF($"index", $"value"))

如果假设得到满足,它应该会:

  • 通过洗牌稀疏对而不是密集的类似数组Rows来出人意料地减少洗牌大小。
  • 仅使用更新的数据聚合(每个O(1)),可能只涉及子集索引。

但是,如果一个或两个假设未得到满足,你可以期望洗牌大小会增加,而更新数量会保持不变。同时,数据偏斜可能会比update- shuffle - merge方案更糟糕。

在带有“强类型”Dataset的情况下使用Aggregator:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}

class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int)  extends Aggregator[I, Array[Long], Seq[Long]]
    with Serializable {
  def zero = Array.fill(bucketSize)(0L)
  def reduce(acc: Array[Long], x: I) = {
    val (i, v) = f(x)
    acc(i) += v
    acc
  }

  def merge(acc1: Array[Long], acc2: Array[Long]) = {
    for {
      i <- 0 until bucketSize
    } acc1(i) += acc2(i)
    acc1
  }

  def finish(acc: Array[Long]) = acc.toSeq

  def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
  def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}

可以按照下面的方式使用

val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS

ds
  .groupByKey(_._1)
  .agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
  .show(false)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2)  |
+-----+-------------------------------+
|1    |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2    |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+

注意:

另请参阅 SPARK-27296 - 用户定义的聚合函数(UDAF)存在严重的效率问题


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接