如何使用combineByKey?

3

我试图使用combineByKey来获得和countByKey相同的结果。

scala> ordersMap.take(5).foreach(println)
(CLOSED,1)
(PENDING_PAYMENT,2)
(COMPLETE,3)
(CLOSED,4)
(COMPLETE,5)

这是我的输入,我想使用combineByKey来得到countByKey的输出结果。
countByKey的输出结果(正确):
PAYMENT_REVIEW 729
CLOSED 7556
SUSPECTED_FRAUD 1558
PROCESSING 8275
COMPLETE 22899
PENDING 7610
PENDING_PAYMENT 15030
ON_HOLD 3798
CANCELED 1428

我已经使用了combineByKey。

val combine = ordersMap.combineByKey(  x => 1 , (a:Int ,v) => a +1 , (a : Int,v : Int) => a +1  )

但是我得到了意外的结果,不确定原因。

combineByKey 的结果

(PENDING_PAYMENT,7600)
(CLOSED,3878)
(CANCELED,699)
(PAYMENT_REVIEW,368)
(PENDING,3764)
(ON_HOLD,1896)
(PROCESSING,4100)
(SUSPECTED_FRAUD,773)
(COMPLETE,11372)

谢谢


你可以详细说明一下,“但是我得到了意外的结果,不确定为什么。”吗?请包括异常情况以备将来参考。 - Jacek Laskowski
嗨@Jacek Laskowski,我错误地使用(a,b) => a+1将执行器的值进行相加,但这是错误的。它应该是(a,b) => a+b,因此我得到了错误的结果。 - Anaadih.pradeep
1个回答

5

这是因为你错误地应用了该函数。你传递给combineByKey的最后一个函数需要将两个可能在不同执行器上计算的累加器类型(C)的值进行组合。这就是为什么该函数被称为mergeCombiners的原因。

文档:

combineByKey[C](createCombiner: (V) ⇒ C, 
                mergeValue: (C, V) ⇒ C, 
                mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

如何使用mergeCombiners

val combine = 
  ordersMap.combineByKey(_ => 1 , (a: Int, _) => a + 1, (a: Int, v: Int) => a + v)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接