Spark聚合函数-aggregateByKey如何工作?

46

假设我有一个分布式系统,由3个节点组成,我的数据在这些节点之间分布。例如,我有一个test.csv文件,在所有3个节点上都存在,并且包含2列:

**row   | id,  c.**
---------------
row1  | k1 , c1  
row2  | k1 , c2  
row3  | k1 , c3  
row4  | k2 , c4  
row5  | k2 , c5  
row6  | k2 , c6  
row7  | k3 , c7  
row8  | k3 , c8  
row9  | k3 , c9  
row10 | k4 , c10   
row11 | k4 , c11  
row12 | k4 , c12 
然后我使用SparkContext.textFile将文件读取为rdd等内容。据我理解,每个Spark工作节点将从文件中读取一部分。所以现在假设每个节点将存储:
- 节点1:第1到4行 - 节点2:第5到8行 - 节点3:第9到12行
我的问题是,假设我想对这些数据进行计算,并且有一个步骤需要将键分组在一起,因此键值对将是[k1 [{k1 c1} {k1 c2} {k1 c3}]]..等等。
有一个名为groupByKey()的函数,使用它非常昂贵,建议使用aggregateByKey()。因此,我想知道groupByKey()aggregateByKey()在背后如何工作?可以用我提供的示例来解释吗?洗牌后,每个节点上的行位于哪里?
2个回答

89

aggregateByKey()与reduceByKey非常不同。使用aggregateByKey()可以将一个特定键的值组合起来,而这种组合的结果可以是您指定的任何对象。您必须指定如何在一个分区内(在同一节点上执行)将值组合(“添加”),以及如何将来自不同分区的结果组合起来(可能在不同节点上)。reduceByKey是一种特殊情况,因为组合的结果(例如总和)与值的类型相同,并且当从不同的分区组合时,操作也与在分区内组合值时相同。

例如:假设有一组成对的列表,您想并行化它:

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))

现在你想要通过关键字合并它们并生成一个总和。在这种情况下,reduceByKey和aggregateByKey是相同的:

val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))

//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))

现在,想象一下你希望聚合的是值的一组Set,这是与整数值不同的类型(因为整数之和也是整数):

import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the  _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3)))

1
非常详细的回答,对此表示感激! - SparkleGoat
你能否提供一份Java代码呢?因为理解Scala很困难。 - rohanagarwal

54

aggregateByKey()reduceByKey()几乎相同(它们都在后台调用combineByKey()),只是你需要为aggregateByKey()提供一个起始值。由于大多数人熟悉reduceByKey(),因此我将在解释中使用它。

reduceByKey()之所以更好,是因为它利用了一种MapReduce功能,称为combiner。任何像+*这样的函数都可以以这种方式使用,因为调用该函数的元素顺序并不重要。这使得Spark能够在尚未全部处于同一分区的情况下开始“缩减”具有相同键的值。

另一方面,groupByKey()提供更多的灵活性,因为您可以编写一个接受Iterable的函数,这意味着您甚至可以将所有元素提取到数组中。但是它效率低下,因为为了使其工作,完整的(K,V,)对集必须位于一个分区中。

在减少类型操作中移动数据的步骤通常称为shuffle,在最简单的级别上,数据被分区到每个节点(通常是使用哈希分区器),然后在每个节点上进行排序。


2
好的,让我们回到我的例子,如果node1有row1row3,node2有row4row6,node3有row7到row12。当我执行groupByKey时,数据是否会移动或者因为具有相同键的rdd已经在同一个节点上而没有移动?谢谢。 - EdwinGuo
1
@EdwinGuo 不是的,数据仍然可以移动。比如说你正在使用哈希分区器,如果所有的k1都在节点1上,但是k1的哈希分区器结果是3,它仍然会被发送到第三个节点。 - aaronman
但是,如果我不在乎顺序,只想返回包含所有值的数组,就像groupByKey一样,是否可以使用除groupByKey之外的其他语法实现? - Adriano Almeida
@AdrianoAlmeida 如果您甚至不想将相同的键放入同一个数组中,您可以使用glom。 - aaronman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接