Spark中的RDD聚合

19

我是一名Apache Spark的学习者,遇到了一个RDD操作aggregate,但我不知道它如何运作。能否有人详细地解释一下,逐步说明我们是如何得出以下代码结果的。

RDD input = {1,2,3,3}

RDD Aggregate function :

rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))

output : {9,4}

谢谢


1
提示:1 + 2 + 3 + 3 = 9,1 + 1 + 1 + 1 = 4,聚合需要使用两个函数,第一个是带有起始值(0,0)的折叠函数,第二个函数是组合函数,用于在完成时连接并行线程。现在请阅读答案中的完整解释。 - devssh
2个回答

29
如果您不确定发生了什么事情,最好遵循类型。为了简洁起见省略隐式的ClassTag,我们从类似下面的内容开始。
abstract class RDD[T] extends Serializable with Logging 

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U 

如果您忽略所有的额外参数,您会发现aggregate是一个从RDD[T]U的映射函数。这意味着输入RDD中的值的类型不必与输出值的类型相同。因此,它显然与reduce有所不同:

def reduce(func: (T, T)  T): T 

或者折叠
def fold(zeroValue: T)(op: (T, T) => T): T

fold 相同,aggregate 需要一个 zeroValue。如何选择它?它应该是一个关于 combOp 的恒等(中性)元素。
您还需要提供两个函数:
  • seqOp,将从 (U, T) 映射到 U
  • combOp,将从 (U, U) 映射到 U
仅基于这些签名,您应该已经看到只有 seqOp 可以访问原始数据。它获取类型为 U 和类型为 T 的一些值,并返回类型为 U 的值。在您的情况下,它是具有以下签名的函数。
((Int, Int), Int) => (Int, Int) 

在这个时候,您可能怀疑它被用于某种类似于折叠的操作。
第二个函数接受两个类型为U的参数,并返回一个类型为U的值。如前所述,很明显它不会影响原始数据,只能对seqOp已处理过的值进行操作。在您的情况下,此函数的签名如下:
((Int, Int), (Int, Int)) => (Int, Int) 

那么我们如何将所有内容组合在一起呢?

  1. 首先,使用标准的Iterator.aggregate函数来聚合每个分区,其中zeroValueseqOpcombOp分别作为zseqopcombop传递。由于内部使用的InterruptibleIterator未覆盖aggregate,因此它应该被执行为一个简单的foldLeft(zeroValue)(seqOp)

  2. 接下来,使用combOp聚合来自每个分区的部分结果

假设输入的RDD有三个分区,其中包含以下值分布:

  • Iterator(1, 2)
  • Iterator(2, 3)
  • Iterator()

可以期望执行(忽略绝对顺序)类似于以下内容:

val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1)
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)

Seq(Iterator(1, 2), Iterator(3, 3), Iterator())
  .map(_.foldLeft((0, 0))(seqOp))
  .reduce(combOp)

对于单个分区,foldLeft 可以如下所示:

Iterator(1, 2).foldLeft((0, 0))(seqOp)
Iterator(2).foldLeft((1, 1))(seqOp)
(3, 2)

并且在所有的分区上执行

Seq((3,2), (6,2), (0,0))

这些组合将会给你观察到的结果:

(3 + 6 + 0, 2 + 2 + 0)
(9, 4)

通常情况下,这是Spark中普遍存在的一种模式,您需要传递一个中性值、一个用于处理每个分区值的函数以及一个用于合并不同分区部分聚合的函数。其他一些示例包括:

  • aggregateByKey
  • 用户自定义聚合函数
  • Aggregators 在Spark Datasets上。

6
以下是我的理解,仅供参考:
假设您有两个节点,一个接受前两个列表元素{1,2}的输入,另一个接受{3, 3}的输入。(这里的分区仅为方便起见)
在第一个节点上:"(x,y) => (x._1+y,x._2+1)",第一个x是给定的(0,0),y是您的第一个元素1,您将得到输出结果(0+1,0+1),然后是第二个元素y=2,输出结果为(1+2,1+1),即(3,2)
在第二个节点上,同样的过程并行进行,最终您将得到(6,2)
"(x,y) => (x._1+y._1,x._2+y._2)"告诉您合并两个节点,您将得到(9,4)
需要注意的一件事是(0,0)实际上被添加到结果长度(rdd)+1次。
"scala> rdd.aggregate((1,1)) ((x,y)=>(x._1+y,x._2+1),(x,y)=>(x._1+y._1,x._2+y._2)) res1: (Int, Int) = (14,9)"

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接