groupByKey和hashPartitioner以及mapPartitions的区别是什么?

4
所以,我有以下数据:
[ (1, data1), (1, data2), (2, data3), (1, data4), (2, data5) ]

我希望将其转换为以下形式,以便进一步处理。
[ (1, [data1, data2, data4]), (2, [data3, data5]) ]

一种方法是使用groupByKey。另一种方法是使用hashPartitioner将RDD根据键进行分区,然后使用mapPartitions处理每个键的值。哪种方法更有效呢?


你应该使用reduceByKey而不是groupByKey。请参考http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html。 - jarandaf
在这种情况下,我认为groupByKey和partition方法的效率将是相同的,因为必须执行的shuffle完全相同(将所有相同的键放入同一个分区),不同之处在于在这种情况下,groupByKey的输出正是您所期望的,而在另一种情况下,您将不得不使用mapPartitions...无论如何,我会像jarandaf建议的那样使用reduceByKey,首先将对象映射到列表中,就像mattinbits在他的答案中指出的那样。 - KBorja
我实际上选择了mapPartitions。我发现它比groupByKey方法快得多。 - MetallicPriest
1个回答

0
我认为你不能直接使用评论中推荐的 reduceByKey,但是如果你将 (Int, Object) 对映射到 (Int, List(Object)),那么你就可以使用它。我预计以下方法应该是最有效的,但实际测试是了解哪种方法适用于你特定用例的最佳方式。
object main extends App {

  import org.apache.spark.{SparkContext, SparkConf}
  import org.apache.spark.rdd.PairRDDFunctions

  val conf = new SparkConf().setMaster("local").setAppName("example")
  val sc = new SparkContext(conf)

  val data = List((1, "data1"), (1, "data2"), (2, "data3"), (1, "data4"), (2, "data5"))

  val rdd = sc.parallelize(data).mapValues(str => List(str)).reduceByKey((l1, l2) => l1 ::: l2)
  rdd.collect().foreach{
    case (key, list) => println(s"key: $key values: ${list.mkString(";")}")
  }
  sc.stop()
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接