这里是一个combineByKey的示例。目标是找到输入数据每个键的平均值。
scala> val kvData = Array(("a",1),("b",2),("a",3),("c",9),("b",6))
kvData: Array[(String, Int)] = Array((a,1), (b,2), (a,3), (c,9), (b,6))
scala> val kvDataDist = sc.parallelize(kvData,5)
kvDataDist: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val keyAverages = kvDataDist.combineByKey(x=>(x,1),(a: (Int,Int),x: Int)=>(a._1+x,a._2+1),(b: (Int,Int),c: (Int,Int))=>(b._1+c._1,b._2+c._2))
keyAverages: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[4] at combineByKey at <console>:25
scala> keyAverages.collect
res0: Array[(String, (Int, Int))] = Array((c,(9,1)), (a,(4,2)), (b,(8,2)))
scala> val keyAveragesFinal = keyAverages.map(x => (x._1,x._2._1/x._2._2))
keyAveragesFinal: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:25
scala> keyAveragesFinal.collect
res1: Array[(String, Int)] = Array((c,9), (a,2), (b,4))
combineByKey需要三个函数作为参数:
函数1 = createCombiner:每个分区中的每个键'k'调用一次
- 输入:与键'k'关联的值
- 输出:任何所需的输出类型'O',基于程序逻辑。此输出类型将自动进一步使用。
在此示例中,选择的输出类型为(Int,Int)。
Pair中的第一个元素对值求和,第二个元素跟踪组成总和的元素数量。
函数2 = mergeValue:在分区内出现键'k'的次数减1次时调用
- 输入:(createCombiner的输出:O,此分区中与键'k'关联的后续值)
- 输出:(输出:O)
函数3 = mergeCombiners:在键存在的分区中调用多次
- 输入:(来自分区X的mergeValue的输出:O,来自分区Y的mergeValue的输出:O)
- 输出:(输出:O)
sc.parallelize([("A",1),("A",2),("B",1),("B",2),("C",1)], num_partitions)
。当num_partitions=1
时,您会得到[('A', '1_@2'), ('C', '1_'), ('B', '1_@2')]
。 - Daniel Darabos