我是一名Apache Spark的学习者,遇到了一个RDD
操作aggregate
,但我不知道它如何运作。能否有人详细地解释一下,逐步说明我们是如何得出以下代码结果的。
RDD input = {1,2,3,3}
RDD Aggregate function :
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
output : {9,4}
谢谢
我是一名Apache Spark的学习者,遇到了一个RDD
操作aggregate
,但我不知道它如何运作。能否有人详细地解释一下,逐步说明我们是如何得出以下代码结果的。
RDD input = {1,2,3,3}
RDD Aggregate function :
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
output : {9,4}
谢谢
ClassTag
,我们从类似下面的内容开始。abstract class RDD[T] extends Serializable with Logging
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
如果您忽略所有的额外参数,您会发现aggregate
是一个从RDD[T]
到U
的映射函数。这意味着输入RDD
中的值的类型不必与输出值的类型相同。因此,它显然与reduce
有所不同:
def reduce(func: (T, T) ⇒ T): T
折叠
:def fold(zeroValue: T)(op: (T, T) => T): T
fold
相同,aggregate
需要一个 zeroValue
。如何选择它?它应该是一个关于 combOp
的恒等(中性)元素。seqOp
,将从 (U, T)
映射到 U
combOp
,将从 (U, U)
映射到 U
seqOp
可以访问原始数据。它获取类型为 U
和类型为 T
的一些值,并返回类型为 U
的值。在您的情况下,它是具有以下签名的函数。((Int, Int), Int) => (Int, Int)
U
的参数,并返回一个类型为U
的值。如前所述,很明显它不会影响原始数据,只能对seqOp
已处理过的值进行操作。在您的情况下,此函数的签名如下:((Int, Int), (Int, Int)) => (Int, Int)
那么我们如何将所有内容组合在一起呢?
首先,使用标准的Iterator.aggregate
函数来聚合每个分区,其中zeroValue
、seqOp
和combOp
分别作为z
、seqop
和combop
传递。由于内部使用的InterruptibleIterator
未覆盖aggregate
,因此它应该被执行为一个简单的foldLeft(zeroValue)(seqOp)
接下来,使用combOp
聚合来自每个分区的部分结果
假设输入的RDD有三个分区,其中包含以下值分布:
Iterator(1, 2)
Iterator(2, 3)
Iterator()
可以期望执行(忽略绝对顺序)类似于以下内容:
val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1)
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
Seq(Iterator(1, 2), Iterator(3, 3), Iterator())
.map(_.foldLeft((0, 0))(seqOp))
.reduce(combOp)
对于单个分区,foldLeft
可以如下所示:
Iterator(1, 2).foldLeft((0, 0))(seqOp)
Iterator(2).foldLeft((1, 1))(seqOp)
(3, 2)
并且在所有的分区上执行
Seq((3,2), (6,2), (0,0))
这些组合将会给你观察到的结果:
(3 + 6 + 0, 2 + 2 + 0)
(9, 4)
通常情况下,这是Spark中普遍存在的一种模式,您需要传递一个中性值、一个用于处理每个分区值的函数以及一个用于合并不同分区部分聚合的函数。其他一些示例包括:
aggregateByKey
Aggregators
在Spark Datasets
上。