如何在Scala Spark中对RDD进行排序?

34

阅读 Spark 中的 sortByKey 方法:

sortByKey([ascending], [numTasks])   When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

是否可以仅返回“N”个结果。因此,不是返回所有结果,而是只返回前10个结果。我可以将排序的集合转换为数组并使用take方法,但由于这是O(N)操作,是否有更有效的方法?


你知道如何排序,现在想知道如何获取前N个。我可以建议编辑问题摘要吗? - Daniel Darabos
3个回答

51

如果您只需要前十个元素,可以使用rdd.top(10)方法。这种方法避免了排序操作,因此速度更快。

rdd.top方法通过对数据进行一次并行遍历,在每个分区中收集前N个元素,并将它们存储在堆中,然后合并这些堆。该方法的时间复杂度为O(rdd.count)。如果使用排序操作,时间复杂度为O(rdd.count log rdd.count),并且会产生大量的数据传输——它需要进行shuffle操作,因此所有数据都必须在网络上传输。


2
我之前不知道这个方法。它比sort()更好。所以这比我的答案更好(尽管它可能提供了一些有用的背景知识)。我给你点赞。 - WestCoastProjects
嗨,我有一个pairRdd,有没有办法在这个pairRdd中使用top方法?例如,top(10)将返回此'pairRdd'中每个键的前10个元素。我真的需要知道这个。 - chrisTina
2
不,它不是这样工作的。我建议针对通过键值查找前10个的问题单独提出一个问题,因为这是一个更大的主题。 - Daniel Darabos
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD - Daniel Darabos

19

很可能您已经浏览了源代码:

  class OrderedRDDFunctions {
   // <snip>
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    val part = new RangePartitioner(numPartitions, self, ascending)
    val shuffled = new ShuffledRDD[K, V, P](self, part)
    shuffled.mapPartitions(iter => {
      val buf = iter.toArray
      if (ascending) {
        buf.sortWith((x, y) => x._1 < y._1).iterator
      } else {
        buf.sortWith((x, y) => x._1 > y._1).iterator
      }
    }, preservesPartitioning = true)
  }

正如你所说,全部数据都必须经过洗牌阶段 - 如代码片段所示。

然而,你对随后调用take(K)的担忧可能并不那么准确。这个操作并没有循环遍历所有N个项目:

  /**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   */
  def take(num: Int): Array[T] = {

看起来是这样的:

O(myRdd.take(K)) << O(myRdd.sortByKey()) ~= O(myRdd.sortByKey.take(k)) (至少对于小的K值) << O(myRdd.sortByKey().collect()

注:该段内容涉及到IT技术,可能需要具备相关背景知识才能更好地理解。

sortByKey() 和其他 RDD 转换一样,都受到惰性求值的影响。sortByKey.take(k) 会优化为 takeOrdered(k, func) 还是 take(k).sortByKey 呢?这就是惰性求值的意义所在,可以优化物理计划。这个功能在数据框架中可能会实现得更好吗? - Tagar
@Ruslan,我不相信Spark Core目前会进行这种重新排列/优化。我只知道类似的优化会在SQL / Catalyst优化器中发生。 - WestCoastProjects

8
另一个选择,至少从PySpark 1.2.0开始,是使用takeOrdered函数。

按升序排列:

rdd.takeOrdered(10)

按降序:

rdd.takeOrdered(10, lambda x: -x)

针对 k,v 键值对的前 k 个最大值:

rdd.takeOrdered(10, lambda (k, v): -v)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接