我认为你应该能够使用combineByKey
函数来处理这个问题,它适用于PairRDD
- 这里的键将是(group, user)
。
第一步 - 相当简单 - 是按键聚合以检索类似的结果:
(G1, U1), 30
(G1, U2), 25
(G1, U3), 20
(G2, U1), 30
(G2, U2), 40
(G2, U3), 45
您可以创建一个名为TopRanked
的类,它将简单地包含前N个结果(例如大小为N的Tuple<String,String,Double>
数组),并公开一个insert(String,String,Double)
方法。这个方法是关键点,它应该允许您正确地插入对象[String,String,Double]
到其位置。它还应该公开一个merge
方法,给定两个这些数据结构,将它们合并成一个表示合并数据结构的前N个。
然后您定义三个函数:
createCombiner: Function<[(String, String), Double)], TopRanked>
mergeValue: Function2<TopRanked, [(String, String), Double)], TopRanked>
mergeCombiners: Function2<TopRanked, TopRanked, TopRanked>
他们应该做的事情非常简单:
createCombiner
必须从行 [(String, String), Double)]
创建一个新的 TopRanked
对象,然后调用 insert(String, String, Double)
。
mergeValue
必须将 [(String, String), Double)]
插入其 TopRanked
中并返回它。
mergeCombiner
只需调用其两个 TopRanked
参数的 merge
并返回合并的 TopRanked
对象。
要使其工作,只需调用 pairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)
(请注意,您可以添加 Partitioner 以进行优化!)。