Spark中reduceByKey、groupByKey、aggregateByKey和combineByKey的区别

98
请问有人可以解释一下 reducebykey、groupbykey、aggregatebykey 和 combinebykey 之间的区别吗?我已经阅读了相关文档,但无法理解确切的区别。最好能够附上示例进行说明。

请查看我对此主题的全面解释!https://bigdata-etl.com/apache-spark-reducebykey-vs-groupbykey-diff/ - Paweł Cieśla
6个回答

111

groupByKey:

语法:

sparkContext.textFile("hdfs://")
                    .flatMap(line => line.split(" ") )
                    .map(word => (word,1))
                    .groupByKey()
                    .map((x,y) => (x,sum(y)))
            
groupByKey可能会导致磁盘空间不足的问题,因为数据会通过网络发送并在减少的工作节点上进行收集。 reduceByKey: 语法:
sparkContext.textFile("hdfs://")
                    .flatMap(line => line.split(" "))
                    .map(word => (word,1))
                    .reduceByKey((x,y)=> (x+y))

在每个分区上将数据组合,每个键只有一个输出发送到网络。 reduceByKey 需要将所有值合并为具有完全相同类型的另一个值。

aggregateByKey:

reduceByKey相同,需要一个初始值。

输入3个参数:

  1. 初始值
  2. 组合逻辑
  3. 序列操作逻辑

示例:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
    val data = sc.parallelize(keysWithValuesList)
    //Create key value pairs
    val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
    val initialCount = 0;
    val addToCounts = (n: Int, v: String) => n + 1
    val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
    val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)

输出: 按键聚合求和结果 bar -> 3 foo -> 5

combineByKey:

输入三个参数

  1. 初始值:与aggregateByKey不同,不必总是传递常量,我们可以传递一个返回新值的函数。
  2. 合并函数
  3. 组合函数

示例:

val result = rdd.combineByKey(
                        (v) => (v,1),
                        ( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
                        ( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2)) 
                        ).map( { case (k,v) => (k,v._1/v._2.toDouble) })
        result.collect.foreach(println)

reduceByKey,aggregateByKey,combineByKey优于groupByKey

参考: 避免使用groupByKey


应该在聚合函数中添加一个 if 子句,以检查组合器是否仅为加法,如果是,则使用 reduceByKey 逻辑?我在这里缺少理解的东西吗,无法在编译时完成这样的操作吗?仅通过硬编码组合器来提高效率意味着应该有这样的检查,如果聚合中存在常见的组合器,就需要多个这样的检查,对吗? - devssh
这些检查甚至可以并行进行,不会妨碍计算的开始,并且可以进行优化。 - devssh

29
  • groupByKey()是基于键将您的数据集分组的操作。当RDD未被分区时,它会导致数据混洗。
  • reduceByKey()类似于分组和聚合的结合体。可以认为reduceByKey()等同于dataset.group(...).reduce(...)。与groupByKey()相比,它会少量洗牌(shuffle)数据。
  • aggregateByKey()在逻辑上与reduceByKey()相同,但它允许您以不同类型返回结果。换句话说,它允许您输入类型x,并将聚合结果作为类型y返回。例如(1,2),(1,4)作为输入和(1,"six")作为输出。它还接受一个零值(zero-value),该值将应用于每个键的开头。

注意:它们的一个共同点是它们都是wide操作。


有人知道rdd.groupByKeyrdd.reduceByKeysql.groupBy之间是否有区别吗?我有一个大型数据集,想使用最高效的方法。谢谢。 - thijsvdp

19
虽然 reduceByKey 和 groupByKey 会产生相同的结果,但在大数据集上,reduceByKey 的示例效果更好。这是因为 Spark 知道它可以在每个分区上将具有相同键的输出组合起来,然后再洗牌数据。另一方面,在调用 groupByKey 时,所有键值对都会被洗牌,这是传输网络上许多不必要的数据。更详细的信息请查看下面的链接。

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html


感谢大家的回复。 - Arun S
2
有没有任何情况下我们应该使用 groupByKey?如果函数不是可结合的,会发生什么? - Albin

12
尽管两者都可以返回相同的结果,但是在性能方面,这两个函数之间存在显著的差异。reduceByKey() 在处理大型数据集时比 groupByKey() 更有效。

reduceByKey() 中,在数据进行 shuffle 之前具有相同键的数据对会被组合(使用传递给 reduceByKey() 的函数)。然后再次调用该函数,以减少每个分区中所有值并生成一个最终结果。

groupByKey() 中,所有键值对都会被传输。这将导致大量不必要的数据在网络上传输。


6

ReduceByKey reduceByKey(func, [numTasks])-

将数据合并,以便每个分区至少有一个键的值。然后进行shuffle操作,并将其发送到某个特定执行器以执行某些操作,例如reduce。

GroupByKey - groupByKey([numTasks])

它不会合并键的值,而是直接发生shuffle过程,这里大量的数据被发送到每个分区,几乎与初始数据相同。

并且在shuffle之后为每个键合并值。这里存储在最终工作节点上的大量数据会导致内存不足问题。

AggregateByKey - aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 它类似于reduceByKey,但可以在执行聚合时提供初始值。

使用reduceByKey

  • reduceByKey可用于运行大型数据集。

  • reduceByKey当输入和输出值类型相同时,优先使用aggregateByKey

此外,建议不要使用groupByKey,而是使用reduceByKey。有关详细信息,请参见此处

您还可以参考这个问题,以更详细地了解reduceByKeyaggregateByKey的工作原理。


2
除了这四个之外,我们还有foldByKey,它与reduceByKey相同,但具有用户定义的零值。
AggregateByKey需要3个参数作为输入,并使用2个函数进行合并(一个用于在同一分区上合并,另一个用于跨分区合并值。第一个参数是ZeroValue)
而ReduceBykey仅需要1个参数,即用于合并的函数。
CombineByKey需要3个参数,所有3个都是函数。与aggregateBykey类似,不同之处在于它可以具有ZeroValue函数。
GroupByKey不需要参数并且将所有内容分组。此外,它是跨分区数据传输的负担。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接