在Spark中,如何计算由(Key, [Value])对组成的RDD中每个对的平均值?

5

我对Scala和Spark都非常新手,如果我做得完全错误,请原谅我。在读取csv文件、筛选和映射之后,我有一个RDD,它是一堆(String, Double)对。

(b2aff711,-0.00510)
(ae095138,0.20321)
(etc.)

当我在RDD上使用.groupByKey()时,
val grouped = rdd1.groupByKey()

需要得到一个包含一系列(String,[Double])对的RDD。(我不知道CompactBuffer是什么意思,可能会导致我的问题?)

(32540b03,CompactBuffer(-0.00699, 0.256023))
(a93dec11,CompactBuffer(0.00624))
(32cc6532,CompactBuffer(0.02337, -0.05223, -0.03591))
(etc.)

一旦它们被分组,我想要计算平均值和标准差。 我想简单地使用 .mean() 和 .sampleStdev() 方法。 当我尝试创建新的 RDD 时,

val mean = grouped.mean()

返回一个错误

错误:(51,22)值mean不是org.apache.spark.rdd.RDD [(String,Iterable [Double]]的成员

val mean = grouped.mean()

我已经导入了org.apache.spark.SparkContext._
我还尝试使用sampleStdev(),.sum(),.stats()得到相同的结果。无论问题是什么,都似乎影响所有数字RDD操作。


1
我之前问过并回答了一个类似的问题,这个(以下URL)可能会对你和其他人有所帮助:https://dev59.com/Pl0a5IYBdhLWcg3wzbYU - NYCeyes
3个回答

4
让我们考虑以下内容:
val data = List(("32540b03",-0.00699), ("a93dec11",0.00624),
                ("32cc6532",0.02337) , ("32540b03",0.256023),
                ("32cc6532",-0.03591),("32cc6532",-0.03591))

val rdd = sc.parallelize(data.toSeq).groupByKey().sortByKey()

计算每对数据的平均值的一种方法如下:

您需要定义一个平均值方法:

def average[T]( ts: Iterable[T] )( implicit num: Numeric[T] ) = {
   num.toDouble( ts.sum ) / ts.size
}

您可以按以下方式将您的方法应用于RDD:
val avgs = rdd.map(x => (x._1, average(x._2)))

您可以检查以下内容:

avgs.take(3)

这是结果:

res4: Array[(String, Double)] = Array((32540b03,0.1245165), (32cc6532,-0.016149999999999998), (a93dec11,0.00624))

1

官方推荐的方式是使用reduceByKey而不是groupByKey

val result = sc.parallelize(data)
  .map { case (key, value) => (key, (value, 1)) }
  .reduceByKey { case ((value1, count1), (value2, count2))
    => (value1 + value2, count1 + count2)}
  .mapValues {case (value, count) =>  value.toDouble / count.toDouble}

另一方面,您解决方案中的问题是将对象RDD (String, Iterable[Double])(就像错误一样)进行了分组。例如,您可以计算Ints或doubles的RDD的平均值,但是对于成对的rdd,平均值将是什么?

@ablcerek,如果您的键中有多个值,那么这是否有效? - E B
@EB 我不确定我是否理解了,但如果您想要计算元素(key, value: List[Double])的rdds,则必须将第一个map更改为{ case (key, value) => (key, (value.sum, value.count))} - abalcerek

1

以下是一个没有自定义函数的完整程序:

val conf = new SparkConf().setAppName("means").setMaster("local[*]")
val sc = new SparkContext(conf)

val data = List(("Lily", 23), ("Lily", 50),
                ("Tom", 66), ("Tom", 21), ("Tom", 69),
                ("Max", 11), ("Max", 24))

val RDD = sc.parallelize(data)

val counts = RDD.map(item => (item._1, (1, item._2.toDouble)) )
val countSums = counts.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2) )
val keyMeans = countSums.mapValues(avgCount => avgCount._2 / avgCount._1)

for ((key, mean) <- keyMeans.collect()) println(key + " " + mean)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接