谁能清晰解释一下Spark中的`combineByKey`方法?

11

我正在学习Spark,但是我不理解这个函数combineByKey

>>> data = sc.parallelize([("A",1),("A",2),("B",1),("B",2),("C",1)] )
>>> data.combineByKey(lambda v : str(v)+"_", lambda c, v : c+"@"+str(v), lambda c1, c2 : c1+c2).collect()

输出结果为:

[('A', '1_2_'), ('C', '1_'), ('B', '1_2_')]

首先,我很困惑:第二步中的 @ 在哪里?lambda c, v : c+"@"+v?我在结果中找不到 @

其次,我阅读了 combineByKey 的函数说明,但是我对算法流程感到困惑。


2
@eliasah的回答已经涵盖了所有内容。但我建议您尝试使用不同数量的分区:sc.parallelize([("A",1),("A",2),("B",1),("B",2),("C",1)], num_partitions)。当num_partitions=1时,您会得到[('A', '1_@2'), ('C', '1_'), ('B', '1_@2')] - Daniel Darabos
3个回答

20

groupByKey 方法不尝试合并/组合值,所以它是一种昂贵的操作。

因此,combineByKey 方法就是这样的一个优化。在使用 combineByKey 时,值被合并成一个值后在每个分区中然后将每个分区值合并为单个值。 值得注意的是,合并后的值的类型不必与原始值的类型匹配,通常情况下不会匹配。 combineByKey 函数接受3个函数作为参数:

  1. 创建组合器的函数。在 aggregateByKey 中,第一个参数仅是初始零值。在 combineByKey 中,我们提供一个函数,该函数将接受当前值作为参数并返回将与其他值合并的新值。

  2. 第二个函数是合并函数,它将一个值合并/组合到先前收集的值中。

  3. 第三个函数将合并后的值组合在一起。基本上,该函数获取在分区级别产生的新值并将它们组合在一起,直到我们最终获得一个单一的值。

换句话说,要理解 combineByKey ,有用的方法是考虑它如何处理它处理的每个元素。当 combineByKey 遍历分区中的元素时,每个元素要么具有它以前未见过的键,要么与之前的元素具有相同的键。

如果是新元素,则 combineByKey 使用我们提供的一个名为 createCombiner() 的函数为该键上的累加器创建初始值。重要的是要注意,这只发生在每个分区中第一次找到键时,而不仅仅是在 RDD 中第一次找到键时。

如果它是我们在处理该分区时之前看过的值,则会使用提供的函数 mergeValue() ,该函数将当前累加器的值和新值用于该键。

由于每个分区都是独立处理的,因此我们可以为同一个键拥有多个累加器。当我们合并来自每个分区的结果时,如果两个或更多分区具有相同键的累加器,则使用用户提供的mergeCombiners()函数合并累加器。

参考资料:


1

'@'只会在每个分区内添加。在您的示例中,每个分区似乎只有一个元素。 请尝试:

data.combineByKey(lambda v : str(v)+"_", lambda c, v : c+"@"+str(v), lambda c1, c2 : c1+'$'+c2).collect() $

并且看到不同之处


1
这里是一个combineByKey的示例。目标是找到输入数据每个键的平均值。
scala> val kvData = Array(("a",1),("b",2),("a",3),("c",9),("b",6))
kvData: Array[(String, Int)] = Array((a,1), (b,2), (a,3), (c,9), (b,6))

scala> val kvDataDist = sc.parallelize(kvData,5)
kvDataDist: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> val keyAverages = kvDataDist.combineByKey(x=>(x,1),(a: (Int,Int),x: Int)=>(a._1+x,a._2+1),(b: (Int,Int),c: (Int,Int))=>(b._1+c._1,b._2+c._2))
keyAverages: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[4] at combineByKey at <console>:25

scala> keyAverages.collect
res0: Array[(String, (Int, Int))] = Array((c,(9,1)), (a,(4,2)), (b,(8,2)))

scala> val keyAveragesFinal = keyAverages.map(x => (x._1,x._2._1/x._2._2))
keyAveragesFinal: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:25

scala> keyAveragesFinal.collect
res1: Array[(String, Int)] = Array((c,9), (a,2), (b,4))

combineByKey需要三个函数作为参数:

  1. 函数1 = createCombiner:每个分区中的每个键'k'调用一次

    • 输入:与键'k'关联的值
    • 输出:任何所需的输出类型'O',基于程序逻辑。此输出类型将自动进一步使用。 在此示例中,选择的输出类型为(Int,Int)。 Pair中的第一个元素对值求和,第二个元素跟踪组成总和的元素数量。
  2. 函数2 = mergeValue:在分区内出现键'k'的次数减1次时调用

    • 输入:(createCombiner的输出:O,此分区中与键'k'关联的后续值)
    • 输出:(输出:O)
  3. 函数3 = mergeCombiners:在键存在的分区中调用多次

    • 输入:(来自分区X的mergeValue的输出:O,来自分区Y的mergeValue的输出:O)
    • 输出:(输出:O)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接