Spark中用户自定义聚合函数(UDAF)何时进行合并?

7
我想知道在哪些情况下Spark将作为UDAF函数的一部分执行合并操作。
动机:我在我的Spark项目中使用了许多窗口中的UDAF函数。通常我想回答这样一个问题:
在过去30天内,有多少次信用卡交易与当前交易发生在同一个国家?
窗口将从当前交易开始,但不包括它在计数中。它需要当前交易的值来确定在过去30天内要计算哪个国家。
val rollingWindow = Window
      .partitionBy(partitionByColumn)
      .orderBy(orderByColumn.desc)
      .rangeBetween(0, windowSize)

df.withColumn(
  outputColumnName,
  customUDAF(inputColumn, orderByColumn).over(rollingWindow))

我编写了一个自定义UDAF来进行计数。我通常使用.orderBy(orderByColumn.desc),而由于.desc的作用,在计算过程中当前交易将在窗口中排在第一位。

UDAF函数需要实现merge函数,以便在并行计算中合并两个中间聚合缓冲区。如果发生任何合并,我的当前交易可能对不同的缓冲区不同,导致UDAF的结果不正确。

我编写了一个UDAF函数来计算我的数据集中的合并次数,并仅保留窗口中的第一项交易与当前交易进行比较。

 class FirstUDAF() extends UserDefinedAggregateFunction {

  def inputSchema = new StructType().add("x", StringType)
    .add("y", StringType)

  def bufferSchema = new StructType()
    .add("first", StringType)
    .add("numMerge", IntegerType)

  def dataType = new StructType()
    .add("firstCode", StringType)
    .add("numMerge", IntegerType)

  def deterministic = true

  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = ""
    buffer(1) = 1
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (buffer.getString(0) == "")
      buffer(0) = input.getString(0)

  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
  }

  def evaluate(buffer: Row) = buffer
}

当我在本地主节点上使用16个CPU和Spark 2.0.1版本运行它时,窗口中从未有任何合并,并且窗口中第一笔交易始终是当前交易。这正是我想要的。在不久的将来,我将在一个x100倍更大的数据集上运行我的代码,并在真正的分布式Spark集群上运行,想知道是否会发生合并。
问题:
  • UDAFF什么时候会发生合并?
  • 带OrderBy的窗口是否会发生合并?
  • 是否可以告诉Spark不要进行合并?
1个回答

4
在什么情况下/条件下,UDAF中进行合并?
当聚合函数的部分应用程序(“映射端聚合”)在洗牌后(“减少端聚合”)合并时,会调用merge
具有orderBy的窗口是否会有合并?
当前实现中从不。截至目前,窗口函数只是高级groupByKey,没有部分聚合。这当然是实现细节,未来可能会更改而无需进一步通知。
是否可以告诉Spark不要进行合并?
不行。但是,如果数据已按聚合键进行分区,则不需要merge,只需要使用combine
最后:
在30天的窗口中,有多少次信用卡交易与当前交易在同一国家发生?
不需要调用UDAFs或窗口函数。我可能会使用o.a.s.sql.functions.window创建滚动窗口,按用户、国家和窗口进行聚合,并与输入进行连接。

谢谢您的澄清。我接受了您的答案。对于您最后提到的一点,我不确定我理解如何做。您能否详细说明一下?如何按窗口聚合?我按用户分区,按日期排序,并计算当前交易(相对于窗口,类似于SQL中的current_row)中发生的国家次数。对于每个交易,这个国家都是不同的。 - astro_asz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接