reduceByKey:它在内部如何工作?

71

我是Spark和Scala的新手。我对Spark中的reduceByKey函数工作方式感到困惑。假设我们有以下代码:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

map函数很清楚:s是键,它指向来自data.txt的一行,1是值。

然而,我不明白reduceByKey内部是如何工作的?a是否指向键?或者,a是否指向"s"?那么a+b代表什么?它们是如何填充的?

5个回答

116

让我们将其分解为离散的方法和类型,通常这样可以为新开发人员展示其复杂性:

pairs.reduceByKey((a, b) => a + b)
pairs.reduceByKey((a: Int, b: Int) => a + b)

重命名变量可以使其更加明确

pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue)

因此,我们现在可以看到,我们只是取给定键的累积值,并将其与该键的下一个值相加。现在,让我们进一步分解它,以便我们可以理解其中的关键部分。因此,让我们更多地像这样来可视化这种方法:

pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => {
  //Turn the accumulated value into a true key->value mapping
  val accumAsMap = accumulatedValue.toMap   
  //Try to get the key's current value if we've already encountered it
  accumAsMap.get(currentValue._1) match { 
    //If we have encountered it, then add the new value to the existing value and overwrite the old
    case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList
    //If we have NOT encountered it, then simply add it to the list
    case None => currentValue :: accumulatedValue 
  }
})

因此,您可以看到,reduceByKey消除了查找键和跟踪键的模板,使您无需担心管理该部分。

如果您想更深入、更准确

尽管如此,这只是发生的简化版本,因为这里有一些优化。这个操作是可结合的,所以Spark引擎会首先在本地执行这些缩减操作(通常称为地图端缩减),然后再次在驱动程序中执行。这节省了网络流量;而不是发送所有数据并执行操作,它可以将其缩小到最小,并将该缩减发送到线路上。


64

reduceByKey函数的一个要求是它必须是可结合的。为了更好地理解reduceByKey的工作原理,让我们首先看一下如何使用可结合函数帮助我们进行并行计算:

associative function in action

正如我们所看到的,我们可以将一个原始集合分成若干部分,并通过应用可结合函数来累积总和。顺序情况很简单,我们已经习惯了:1+2+3+4+5+6+7+8+9+10。

可结合性让我们能够在顺序情况和并行情况下使用同样的函数。reduceByKey利用这个属性来计算出RDD的结果,RDD是由分区组成的分布式集合。

考虑以下示例:

// collection of the form ("key",1),("key,2),...,("key",20) split among 4 partitions
val rdd =sparkContext.parallelize(( (1 to 20).map(x=>("key",x))), 4)
rdd.reduceByKey(_ + _)
rdd.collect()
> Array[(String, Int)] = Array((key,210))

Spark中,数据被分布到分区中,下面的插图中,(4)个分区在细线框内左侧。首先,我们在每个分区中按顺序本地应用函数,但是我们同时并行运行所有4个分区。然后,通过再次应用相同的函数来聚合每个本地计算的结果,并最终得出结果。

enter image description here

reduceByKeyaggregateByKey的一种特殊情况,aggregateByKey需要两个函数:一个应用于每个分区(按顺序),另一个应用于每个分区结果之间(并行)。reduceByKey在这两种情况下使用相同的关联函数:对每个分区进行顺序计算,然后将这些结果组合成最终结果,就像我们在这里演示的那样。


14

在你的例子中

val counts = pairs.reduceByKey((a,b) => a+b)

abpairs中元组_2的整数累加器。 reduceByKey会取两个具有相同值s的元组,并使用它们的_2值作为ab,生成一个新的Tuple[String,Int]元组。此操作将重复执行,直到每个键s只剩下一个元组为止。

与非Spark(或者说非并行)的reduceByKey不同,其中第一个元素始终是累加器,第二个元素是值。 reduceByKey以分布式方式运行,即每个节点将其元组集减少到一组具有唯一键的元组集合中,然后从多个节点缩小元组,直到产生最终的具有唯一键的元组集合。这意味着随着节点结果的缩小,ab表示已经减少的累加器。


2
Spark RDD 的 reduceByKey 函数使用一个可关联的 reduce 函数合并相同键对应的值。
reduceByKey 函数仅适用于 RDD,它是一种转换操作,也就是说它是惰性评估的。需要传递一个可关联的函数作为参数,该函数将应用于源 RDD,并创建一个新的 RDD 作为结果。
因此,在您的示例中,rdd pairs 具有多个成对元素的集合,例如(s1,1),(s2,1)等。reduceByKey 接受一个函数(accumulator,n)=>(accumulator + n),该函数将累加器变量初始化为默认值 0 并添加每个键的元素,并返回具有总计数与键配对的结果 rdd counts。

0

如果您的输入RDD数据如下所示: (aa,1) (bb,1) (aa,1) (cc,1) (bb,1)

如果您在上述RDD数据上应用reduceByKey,则需要记住以下几点: reduceByKey始终采用2个输入(x,y),并且始终同时处理两行。 由于它是reduceByKey,因此它将组合相同键的两行并组合值的结果。

val rdd2 = rdd.reduceByKey((x,y) => x+y) rdd2.foreach(println)

输出: (aa,2) (bb,2) (cc,1)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接