groupByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey
可能会导致磁盘空间不足的问题,因为数据会通过网络发送并在减少的工作节点上进行收集。
reduceByKey:
语法:sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
在每个分区上将数据组合,每个键只有一个输出发送到网络。 reduceByKey
需要将所有值合并为具有完全相同类型的另一个值。
aggregateByKey:
与reduceByKey
相同,需要一个初始值。
输入3个参数:
示例:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
输出: 按键聚合求和结果 bar -> 3 foo -> 5
combineByKey:
输入三个参数
aggregateByKey
不同,不必总是传递常量,我们可以传递一个返回新值的函数。示例:
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
reduceByKey
,aggregateByKey
,combineByKey
优于groupByKey
参考: 避免使用groupByKey
if
子句,以检查组合器是否仅为加法,如果是,则使用 reduceByKey 逻辑?我在这里缺少理解的东西吗,无法在编译时完成这样的操作吗?仅通过硬编码组合器来提高效率意味着应该有这样的检查,如果聚合中存在常见的组合器,就需要多个这样的检查,对吗? - devsshgroupByKey()
是基于键将您的数据集分组的操作。当RDD未被分区时,它会导致数据混洗。reduceByKey()
类似于分组和聚合的结合体。可以认为reduceByKey()
等同于dataset.group(...).reduce(...)。与groupByKey()
相比,它会少量洗牌(shuffle)数据。aggregateByKey()
在逻辑上与reduceByKey()
相同,但它允许您以不同类型返回结果。换句话说,它允许您输入类型x,并将聚合结果作为类型y返回。例如(1,2),(1,4)作为输入和(1,"six")作为输出。它还接受一个零值(zero-value),该值将应用于每个键的开头。注意:它们的一个共同点是它们都是wide操作。
rdd.groupByKey
、rdd.reduceByKey
和sql.groupBy
之间是否有区别吗?我有一个大型数据集,想使用最高效的方法。谢谢。 - thijsvdpreduceByKey()
在处理大型数据集时比 groupByKey()
更有效。
在 reduceByKey()
中,在数据进行 shuffle 之前具有相同键的数据对会被组合(使用传递给 reduceByKey()
的函数)。然后再次调用该函数,以减少每个分区中所有值并生成一个最终结果。
在 groupByKey()
中,所有键值对都会被传输。这将导致大量不必要的数据在网络上传输。
ReduceByKey reduceByKey(func, [numTasks])
-
将数据合并,以便每个分区至少有一个键的值。然后进行shuffle操作,并将其发送到某个特定执行器以执行某些操作,例如reduce。
GroupByKey - groupByKey([numTasks])
它不会合并键的值,而是直接发生shuffle过程,这里大量的数据被发送到每个分区,几乎与初始数据相同。
并且在shuffle之后为每个键合并值。这里存储在最终工作节点上的大量数据会导致内存不足问题。
AggregateByKey - aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
它类似于reduceByKey,但可以在执行聚合时提供初始值。
使用reduceByKey
reduceByKey
可用于运行大型数据集。
reduceByKey
当输入和输出值类型相同时,优先使用aggregateByKey
。
此外,建议不要使用groupByKey
,而是使用reduceByKey
。有关详细信息,请参见此处。
您还可以参考这个问题,以更详细地了解reduceByKey
和aggregateByKey
的工作原理。