Spark两级聚合

3
我的RDD包含三个值[组,用户,字节]。我的要求是按用户消耗的字节数进行聚合,并获取每个组中总字节数最高的前N个用户。
例如:输入数据如下:
G1 U1 10
G1 U1 20
G1 U2 25
G1 U3 20
G2 U1 30
G2 U2 15
G2 U2 25
G2 U3 45

使用top2的查询应返回以下结果:
G1 U1 30
G1 U2 25
G2 U3 45
G2 U2 40

目前我的代码如下:

rdd: RDD[(String, String), Double)
rdd.reduceByKey((x,y) => (x+y))
    .map {
       x => ((x._1._1), (x._1._2, x._2))
    }.sortBy(x => x._2._2, false)

我还没有弄清如何按GROUP值进行分组,然后只取前N个结果。有人可以帮忙吗?或者是否有更好的解决方案?


你如何创建你的RDD? - eliasah
我在从文本文件加载的RDD上使用flatMap(这是为了测试目的;实际上,我将更改为从cassandra读取)。flatMap采用以下方法:def traverseDataPoint(dataPoint:Array [Byte]):ListBuffer [((String,String),Double)] - Firdousi Farozan
2个回答

1

从您的问题来看,似乎您正在尝试获取每个组的排名(SQL)。

所以这是我的解决方案。它可能不是最有效的,但它可以工作。

val rddsum = rdd.reduceByKey((x,y) => (x+y)).map(x => (x._1._1,x._1._2,x._2))

给出与之前相同的结果。

(G1, U1, 30)
(G1, U2, 25)
(G1, U3, 20)
(G2, U1, 30)
(G2, U2, 40)
(G2, U3, 45)

现在,按第一列分组,并使用rank进行mapValues。
val grpd = rddsum.groupBy{x => x._1}

val sortAndRankedItems = grpd.mapValues{ it => it.toList.sortBy{x => x._3}.zip(Stream from 1) } 

现在,sortAndRankedItems将是类型为Array[(String, List[((String, String, String), Int)])]的数组。 因此,通过flatmapping只取第二个元素,这是感兴趣的元素,在本例中筛选topN元素,即2个,然后仅考虑第一个元素,即元组,以得出答案。
val result = sortAndRankedItems.flatMap{case(m,n) => n}.filter{x => x._2 <= 2}.map{case(x,y) => x}

希望它能有所帮助!

0

我认为你应该能够使用combineByKey函数来处理这个问题,它适用于PairRDD - 这里的键将是(group, user)

第一步 - 相当简单 - 是按键聚合以检索类似的结果:

(G1, U1), 30
(G1, U2), 25
(G1, U3), 20
(G2, U1), 30
(G2, U2), 40
(G2, U3), 45

您可以创建一个名为TopRanked的类,它将简单地包含前N个结果(例如大小为N的Tuple<String,String,Double>数组),并公开一个insert(String,String,Double)方法。这个方法是关键点,它应该允许您正确地插入对象[String,String,Double]到其位置。它还应该公开一个merge方法,给定两个这些数据结构,将它们合并成一个表示合并数据结构的前N个。

然后您定义三个函数:

createCombiner: Function<[(String, String), Double)], TopRanked>
mergeValue: Function2<TopRanked, [(String, String), Double)], TopRanked>
mergeCombiners: Function2<TopRanked, TopRanked, TopRanked>

他们应该做的事情非常简单:

createCombiner 必须从行 [(String, String), Double)] 创建一个新的 TopRanked 对象,然后调用 insert(String, String, Double)

mergeValue 必须将 [(String, String), Double)] 插入其 TopRanked 中并返回它。

mergeCombiner 只需调用其两个 TopRanked 参数的 merge 并返回合并的 TopRanked 对象。

要使其工作,只需调用 pairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)(请注意,您可以添加 Partitioner 以进行优化!)。


谢谢。我会试一下。但是,当聚合发生在每个分区级别时,如果我们只存储该分区的 topN,那么 topN 如何正确工作呢?难道我们不必存储所有元组才能得到正确的 topN 吗? - Firdousi Farozan
如果您了解RDD的topN API的一些内部细节,请分享。 - Firdousi Farozan
1
嘿!TopRanked类看起来有点像一个围绕着简单的 [String, String, Double] 数组的封装器 - 固定大小为 N。它将有一个以大小 N 为输入的构造函数,并实例化此数组,一个 insert 方法,如果 double 作为顶部 N 个条目之一,则将 [String,String,Double] 插入其位置,否则不执行任何操作;以及一个 merge 方法,它合并两个这些结构(如融合排序)。@Anda 上面的解决方案很好,但意味着你可能想避免大数据时的洗牌操作。 - Vince.Bdn
顺便提一句,如果你想了解为什么上面的解决方案在洗牌时可能太昂贵了,这里有一些你可能想要阅读的内容:[链接](http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html) - Vince.Bdn
谢谢提供链接。我知道groupByKey操作很耗费资源。 - Firdousi Farozan
我仍在努力理解这个固定大小的N数组将如何帮助。Spark工作器将在每个分区上操作,假设有两个分区。每个分区中都会有一个N大小的数组TopRanked。每个分区中的topN将累积在此对象中。而低于topN的任何内容都将被丢弃。按照这种逻辑,我们可能会失去实际的topN,这些topN可能低于每个分区的topN。如果我的理解有误,请告诉我。 - Firdousi Farozan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接