reduceByKey
,因为它在洗牌数据之前执行映射端的reduce操作,这通常意味着传输的数据量较少,从而获得更好的性能。即使映射端的reduce函数收集了所有值并且实际上没有减少数据量,我仍然使用reduceByKey
,因为我假设reduceByKey
的性能永远不会比groupByKey
差。但是,我想知道这个假设是否正确,或者是否存在一些情况下应该优先使用groupByKey
?reduceByKey
,因为它在洗牌数据之前执行映射端的reduce操作,这通常意味着传输的数据量较少,从而获得更好的性能。即使映射端的reduce函数收集了所有值并且实际上没有减少数据量,我仍然使用reduceByKey
,因为我假设reduceByKey
的性能永远不会比groupByKey
差。但是,我想知道这个假设是否正确,或者是否存在一些情况下应该优先使用groupByKey
?我认为 climbage 和 eliasah 忽略了问题的其他方面:
如果操作不会减少它必须处理的数据量,那么它必须在某种程度上与GroupByKey
在语义上等价。假设我们有一个RDD[(Int,String)]
:
import scala.util.Random
Random.setSeed(1)
def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")
val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))
我们希望将给定键的所有字符串连接起来。使用groupByKey
非常简单:
rdd.groupByKey.mapValues(_.mkString(""))
使用reduceByKey
的朴素解决方案如下:
rdd.reduceByKey(_ + _)
这段话简洁易懂,但存在两个问题:
String
对象*为了解决第一个问题,我们需要一个可变数据结构:
import scala.collection.mutable.StringBuilder
rdd.combineByKey[StringBuilder](
(s: String) => new StringBuilder(s),
(sb: StringBuilder, s: String) => sb ++= s,
(sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)
这仍然表明发生了其他事情,并且在脚本中多次重复时相当冗长。当然,您可以提取匿名函数。
val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) =>
sb1.append(sb2)
rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)
但是归根结底,这仍然意味着需要额外的努力来理解这段代码,增加了复杂性,却没有真正的附加值。我发现特别烦人的一件事是显式包含可变数据结构。即使Spark处理了几乎所有的复杂性,这意味着我们不再拥有优雅、引用透明的代码。
我的观点是,如果你真的通过各种方式减少数据量,请使用reduceByKey
。否则,你会使你的代码更难写,更难分析,并且得不到任何回报。
注意:
本答案集中讨论Scala RDD
API。当前的Python实现与其JVM版本非常不同,并包括提供了显著优势的优化,在类似于groupBy
的操作中比幼稚的reduceByKey
实现要好。
有关Dataset
API,请参见DataFrame / Dataset groupBy behaviour/optimization。
* 请参见Scala vs Python的Spark性能的令人信服的例子
reduceByKey
和groupByKey
都使用combineByKey
,但具有不同的组合/合并语义。
我看到的主要区别是groupByKey
向Shuffle引擎传递标志(mapSideCombine=false
)。根据问题SPARK-772的描述,这是一种提示,告诉Shuffle引擎在数据大小不变时不要运行Map端合并。
因此,如果您试图使用reduceByKey
来复制groupByKey
,可能会稍微降低性能。
根据代码文档,groupByKey
操作将RDD中每个键的值分组为一个序列,并允许通过传递 Partitioner
来控制生成的键值对RDD的分区。
这个操作可能非常昂贵。如果您要对每个键执行聚合(例如求和或平均值),使用 aggregateByKey
或 reduceByKey
将提供更好的性能。
注意:目前实现的 groupByKey
必须能够在内存中保存任何键的所有键值对。如果一个键有太多的值,可能会导致 OOME。
事实上,我更喜欢 combineByKey
操作,但是如果您不熟悉 map-reduce 范例,很难理解 combiner 和 merger 的概念。因此,您可以阅读雅虎的 map-reduce 圣经这里,它很好地解释了这个主题。
如需更多信息,建议阅读PairRDDFunctions 代码。
groupByKey
可能存在的问题(例如给定键的值太多)-问题是是否有时groupByKey
实际上是更好的选择。您提到使用groupByKey
时可以控制生成的键值对的分区,但是使用reduceByKey
也可以控制它,因此似乎没有理由使用groupByKey
,或者我误解了您的意思? - Glennie Helles SindholtgroupByKey
视为语法糖。如果可以避免使用它,最好使用aggregateByKey
、reduceByKey
或combineByKey
。 - eliasahcombineByKey
与 CompactBuffer
时,+=
和 ++=
与 groupByKey
完全等效,但 combineByKey
允许您根据数据分布选择更有效的数据结构。可以说只有少数情况下这些不能被重新分区和外部排序替代,但对于典型用户来说,这很可能是一种低级方法。 - zero323
groupByKey
只是语法糖,而@climbage认为如果我使用reduceByKey
来复制groupByKey
功能,可能会稍微慢一些。我想我实际上会尝试在一些示例上测试这两个函数 :) - Glennie Helles Sindholt