组合键(groupByKey)是否比减少键(reduceByKey)更受欢迎?

20
当我需要对RDD数据进行分组时,我总是使用reduceByKey,因为它在洗牌数据之前执行映射端的reduce操作,这通常意味着传输的数据量较少,从而获得更好的性能。即使映射端的reduce函数收集了所有值并且实际上没有减少数据量,我仍然使用reduceByKey,因为我假设reduceByKey的性能永远不会比groupByKey差。但是,我想知道这个假设是否正确,或者是否存在一些情况下应该优先使用groupByKey

从下面得到的答案中(感谢那些回答),@eliasah说groupByKey只是语法糖,而@climbage认为如果我使用reduceByKey来复制groupByKey功能,可能会稍微慢一些。我想我实际上会尝试在一些示例上测试这两个函数 :) - Glennie Helles Sindholt
https://dev59.com/NYvda4cB1Zd3GeqPgPOD - Knight71
我唯一需要使用groupByKey的情况是在依赖于先前值的数据样本计算中。预先计算的累加总数就是一个例子。GPS距离等。 - pestilence669
3个回答

18

我认为 climbageeliasah 忽略了问题的其他方面:

  • 代码可读性
  • 代码可维护性
  • 代码库大小

如果操作不会减少它必须处理的数据量,那么它必须在某种程度上与GroupByKey在语义上等价。假设我们有一个RDD[(Int,String)]

import scala.util.Random
Random.setSeed(1)

def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")

val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))

我们希望将给定键的所有字符串连接起来。使用groupByKey非常简单:

rdd.groupByKey.mapValues(_.mkString(""))

使用reduceByKey的朴素解决方案如下:

rdd.reduceByKey(_ + _)

这段话简洁易懂,但存在两个问题:

  • 效率极低,因为它每次都会创建一个新的String对象*
  • 建议你执行的操作比实际上更加昂贵,特别是如果你只分析DAG或调试字符串

为了解决第一个问题,我们需要一个可变数据结构:

import scala.collection.mutable.StringBuilder

rdd.combineByKey[StringBuilder](
    (s: String) => new StringBuilder(s),
    (sb: StringBuilder, s: String) => sb ++= s,
    (sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)

这仍然表明发生了其他事情,并且在脚本中多次重复时相当冗长。当然,您可以提取匿名函数。

val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) => 
  sb1.append(sb2)

rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)

但是归根结底,这仍然意味着需要额外的努力来理解这段代码,增加了复杂性,却没有真正的附加值。我发现特别烦人的一件事是显式包含可变数据结构。即使Spark处理了几乎所有的复杂性,这意味着我们不再拥有优雅、引用透明的代码。

我的观点是,如果你真的通过各种方式减少数据量,请使用reduceByKey。否则,你会使你的代码更难写,更难分析,并且得不到任何回报。

注意:

本答案集中讨论Scala RDD API。当前的Python实现与其JVM版本非常不同,并包括提供了显著优势的优化,在类似于groupBy的操作中比幼稚的reduceByKey实现要好。

有关Dataset API,请参见DataFrame / Dataset groupBy behaviour/optimization


* 请参见Scala vs Python的Spark性能的令人信服的例子


7

reduceByKeygroupByKey都使用combineByKey,但具有不同的组合/合并语义。

我看到的主要区别是groupByKey向Shuffle引擎传递标志(mapSideCombine=false)。根据问题SPARK-772的描述,这是一种提示,告诉Shuffle引擎在数据大小不变时不要运行Map端合并。

因此,如果您试图使用reduceByKey来复制groupByKey,可能会稍微降低性能。


3

根据代码文档,groupByKey 操作将RDD中每个键的值分组为一个序列,并允许通过传递 Partitioner 来控制生成的键值对RDD的分区。

这个操作可能非常昂贵。如果您要对每个键执行聚合(例如求和或平均值),使用 aggregateByKeyreduceByKey 将提供更好的性能。

注意:目前实现的 groupByKey 必须能够在内存中保存任何键的所有键值对。如果一个键有太多的值,可能会导致 OOME。

事实上,我更喜欢 combineByKey 操作,但是如果您不熟悉 map-reduce 范例,很难理解 combiner 和 merger 的概念。因此,您可以阅读雅虎的 map-reduce 圣经这里,它很好地解释了这个主题。

如需更多信息,建议阅读PairRDDFunctions 代码


我理解groupByKey可能存在的问题(例如给定键的值太多)-问题是是否有时groupByKey实际上是更好的选择。您提到使用groupByKey时可以控制生成的键值对的分区,但是使用reduceByKey也可以控制它,因此似乎没有理由使用groupByKey,或者我误解了您的意思? - Glennie Helles Sindholt
1
完全正确,你可以将groupByKey视为语法糖。如果可以避免使用它,最好使用aggregateByKeyreduceByKeycombineByKey - eliasah
@GlennieHellesSindholt,你似乎并不被说服。 - eliasah
CombineByKey允许执行单调操作,而GroupByKey则不行。因此实际上不应该有相同的问题。 - eliasah
1
@shuaiyuancn 在使用 combineByKeyCompactBuffer 时,+=++=groupByKey 完全等效,但 combineByKey 允许您根据数据分布选择更有效的数据结构。可以说只有少数情况下这些不能被重新分区和外部排序替代,但对于典型用户来说,这很可能是一种低级方法。 - zero323
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接