在Apache Spark(Scala)中使用reduceByKey

24

我有一个元组列表,类型为:(用户ID,名称,计数)。

例如:

val x = sc.parallelize(List(
    ("a", "b", 1),
    ("a", "b", 1),
    ("c", "b", 1),
    ("a", "d", 1))
)

我试图将这个集合简化为一种类型,其中每个元素的名称都被计数。

因此,在上面的示例中,val x 被转换为:

(a,ArrayBuffer((d,1), (b,2)))
(c,ArrayBuffer((b,1)))

这是我目前正在使用的代码:

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

val grouped = byKey.groupByKey
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))}
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey

grouped2.foreach(println)

我试图使用reduceByKey,因为它比groupByKey执行得更快。

如何实现reduceByKey来替代上面的代码以提供相同的映射?

3个回答

31

根据您的代码:

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

你可以这样做:

val reducedByKey = byKey.reduceByKey(_ + _)

scala> reducedByKey.collect.foreach(println)
((a,d),1)
((a,b),2)
((c,b),1)
PairRDDFunctions[K,V].reduceByKey 接收一个可应用于 RDD[(K,V)] 中类型为 V 的 reduce 函数。换句话说,您需要一个函数 f[V](e1:V, e2:V) : V。在这个特定的案例中,使用 Ints 的 sum: (x:Int, y:Int) => x+y 或简短的下划线符号表示法 _ + _
值得一提的是:reduceByKey 性能优于 groupByKey,因为它试图在 shuffle/reduce 阶段之前本地应用 reduce 函数。而 groupByKey 将会强制对所有元素进行分组前的洗牌操作。

1
所以,基本上,reduceByKey 的结果与进行 groupBy 然后应用自定义 reduce 函数的结果相同? - Savvas Parastatidis
5
最终结果相同,但reduceByKey每个executor的内存需求为O(1),而groupByKey需要将所有分组值存储在内存中,可能导致OOM。 - maasg

6
您的原始数据结构是:RDD[(String, String, Int)],而reduceByKey只能在数据结构为RDD[(K,V)]时使用。
val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)]
val reduced = kv.reduceByKey(_ + _)       // reduced is RDD[((String, String), Int)]
val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))]
val grouped = kv2.groupByKey()            // grouped is RDD[(String, Iterable[(String, Int)])]
grouped.foreach(println)

没有限制V必须是数字。唯一的要求是函数f(V,V)=>V必须是可结合的。如果不是,你将得到不一致的结果。 - maasg
那是一个错误...我当时在想的是(_ + _) :P,已更新。 - cloud

0

语法如下:

reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],

该函数表示在相同的RDD键中,获取值(这些值肯定是相同类型),执行作为函数一部分提供的操作,并返回与父RDD相同类型的值。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接