如何从Spark RDD计算平均值?

7

我有一个关于Spark Scala的问题,我想从Rdd数据中计算平均值,我创建了一个新的RDD,如下所示:

[(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]

我想这样计数,
[(2,(110+130+120)/3),(3,(200+206+206)/3),(4,(150+160+170)/3)]

然后,得到如下结果:

   [(2,120),(3,204),(4,160)]

我该如何使用Scala从RDD中实现此操作?我使用的是Spark 1.6版本。
3个回答

5
在这种情况下,您可以使用groupByKey。例如:
val rdd = spark.sparkContext.parallelize(List((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)))
val processedRDD = rdd.groupByKey.mapValues{iterator => iterator.sum / iterator.size}
processedRDD.collect.toList

在这里,groupByKey将返回RDD[(Int, Iterator[Int])] ,然后您可以在Iterator上简单应用平均操作。

希望这对您有用

谢谢


强烈建议使用aggregateByKey或combineByKey。您的解决方案没问题,但可能会导致内存溢出错误。 - T. Gawęda
谢谢@T.Gawęda,当涉及到优化时,我的代码确实有所欠缺。我认为这只是一个小场景。 - Akash Sethi

5
你可以使用 aggregateByKey。
val rdd = sc.parallelize(Seq((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)))
val agg_rdd = rdd.aggregateByKey((0,0))((acc, value) => (acc._1 + value, acc._2 + 1),(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val sum = agg_rdd.mapValues(x => (x._1/x._2))
sum.collect

1
您可以使用.combineByKey()来计算平均值:
val data = sc.parallelize(Seq((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)))

val sumCountPair = data.combineByKey((x: Int) => (x.toDouble,1),
                                     (pair1: (Double, Int), x: Int) => (pair1._1 + x, pair1._2 + 1), 
                                     (pair1: (Double, Int), pair2: (Double, Int)) => (pair1._1 + pair2._1, pair1._2 + pair2._2))

val average = sumCountPair.map(x => (x._1, (x._2._1/x._2._2)))
average.collect()

这里sumCountPair返回类型为RDD[(Int, (Double, Int))],表示:(关键字,(总和值,计数值))。下一步只需将总和除以计数并返回(关键字,平均值)


1
你和Alex的答案是最好的 - 它们不需要将分区的每个元素都分组到一个节点上。 - T. Gawęda

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接