Spark reduceByKey及如何最小化Shuffling

3
我正在处理这样的表格:

我正在处理这样的表格:

ID    f1
001   1
001   2
001   3
002   0
002   7

我希望计算相同ID下f1列的总和,并创建一个新的列来存储该总和,即:

ID    f1   sum_f1
001   1    6
001   2    6
001   3    6
002   0    7
002   7    7

我的解决方案是使用reduceByKey计算总和,然后将结果与原始表连接:
val table = sc.parallelize(Seq(("001",1),("001",2),("001",3),("002",0),("002",7)))
val sum = table.reduceByKey(_ + _)
val result = table.leftOuterJoin(sum).map{ case (a,(b,c)) => (a, b, c.getOrElse(-1) )}

我得到了正确的结果:

result.collect.foreach(println)

输出:

(002,0,7)
(002,7,7)
(001,1,6)
(001,2,6)
(001,3,6)

问题在于代码中有2个洗牌阶段,一个在reduceByKey中,另一个在leftOuterJoin中,但如果我用Hadoop MapReduce编写代码,则可以在只有1个洗牌阶段的情况下轻松获得相同结果(在reduce阶段多次使用outputer.collect函数)。 因此,我想知道是否有更好的方法可以只进行一次洗牌。欢迎提出任何建议。

1
我认为问题的标题应该修改,以明确地表示你的问题。 - Alberto Bonsanto
2个回答

1
另一种方法是使用aggregateByKey。这可能是一个难以理解的方法,但从Spark文档中可以了解到:
(groupByKey) 注意:此操作可能非常昂贵。如果您正在分组以执行每个键的聚合(例如总和或平均值), 使用PairRDDFunctions.aggregateByKeyPairRDDFunctions.reduceByKey将提供更好的性能。
此外,aggregateByKey是一个通用函数,因此值得知道。
当然,我们在这里不做“简单的汇总,如求和”,因此这种方法与groupByKey相比的性能优势可能不存在。 显然,在真实数据上对两种方法进行基准测试是个好主意。
以下是详细实现:
// The input as given by OP here: https://dev59.com/JJXfa4cB1Zd3GeqPkc5d
val table = sc.parallelize(Seq(("001", 1), ("001", 2), ("001", 3), ("002", 0), ("002", 7)))

// zero is initial value into which we will aggregate things.
// The second element is the sum.
// The first element is the list of values which contributed to this sum.
val zero = (List.empty[Int], 0)

// sequencer will receive an accumulator and the value.
// The accumulator will be reset for each key to 'zero'.
// In this sequencer we add value to the sum and append to the list because
// we want to keep both.
// This can be thought of as "map" stage in classic map/reduce.
def sequencer(acc: (List[Int], Int), value: Int) = {
  val (values, sum) = acc
  (value :: values, sum + value)
}

// combiner combines two lists and sums into one.
// The reason for this is the sequencer may run in different partitions
// and thus produce partial results. This step combines those partials into
// one final result.
// This step can be thought of as "reduce" stage in classic map/reduce.
def combiner(left: (List[Int], Int), right: (List[Int], Int)) = {
  (left._1 ++ right._1, left._2 + right._2)
}

// wiring it all together.
// Note the type of result it produces:
// Each key will have a list of values which contributed to the sum, sum the sum itself.
val result: RDD[(String, (List[Int], Int))] = table.aggregateByKey(zero)(sequencer, combiner)

// To turn this to a flat list and print, use flatMap to produce:
// (key, value, sum)
val flatResult: RDD[(String, Int, Int)] = result.flatMap(result => {
  val (key, (values, sum)) = result
  for (value <- values) yield (key, value, sum)
})

// collect and print
flatResult.collect().foreach(println)

这个产生:
(001,1,6)
(001,2,6)
(001,3,6)
(002,0,7)
(002,7,7)

如果您想引用它,这里还有一个具有完全可运行版本的gist: https://gist.github.com/ppanyukov/253d251a16fbb660f225fb425d32206a


0
你可以使用groupByKey来获取数值列表,取和并使用flatMapValues重新创建行:

val g = table.groupByKey().flatMapValues { f1s =>
  val sum = f1s.reduce(_ + _)
  f1s.map(_ -> sum)
}

但是在这段代码中,reduce 是在本地工作的,因此如果单个键具有太多值,则会失败。

另一种方法是先进行分区,然后保留 join,这样 join 就会变得便宜:

val partitioned = table.partitionBy(
  new org.apache.spark.HashPartitioner(table.partitions.size))
partitioned.cache // May or may not improve performance.
val sum = partitioned.reduceByKey(_ + _)
val result = partitioned.join(sum)

我无法猜测哪个更快。我会对所有选项进行基准测试。


谢谢!我有空的时候会对它们进行基准测试,并在评论中告诉你。 - ww_stack

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接