我有一个元组列表,类型为:(用户ID,名称,计数)。
例如:
val x = sc.parallelize(List(
("a", "b", 1),
("a", "b", 1),
("c", "b", 1),
("a", "d", 1))
)
我试图将这个集合简化为一种类型,其中每个元素的名称都被计数。
因此,在上面的示例中,val x 被转换为:
(a,ArrayBuffer((d,1), (b,2)))
(c,ArrayBuffer((b,1)))
这是我目前正在使用的代码:
val byKey = x.map({case (id,uri,count) => (id,uri)->count})
val grouped = byKey.groupByKey
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))}
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey
grouped2.foreach(println)
我试图使用reduceByKey,因为它比groupByKey执行得更快。
如何实现reduceByKey来替代上面的代码以提供相同的映射?
reduceByKey
每个executor的内存需求为O(1),而groupByKey
需要将所有分组值存储在内存中,可能导致OOM。 - maasg