如何对RDD进行排序

7

我有一个名为scoreTriplets的RDD[ARRAY[String]],我按照以下方式进行排序。

var ScoreTripletsArray = scoreTriplets.collect()
  if (ScoreTripletsArray.size > 0) {        
    /*Sort the ScoreTripletsArray descending by score field*/        
    scala.util.Sorting.stableSort(ScoreTripletsArray, (e1: Array[String], e2: Array[String]) => e1(3).toInt > e2(3).toInt)
}

如果缺少元素,则collect()操作会变得很重。因此,我需要在不使用collect()的情况下按score对RDD进行排序。
scoreTriples是一个RDD[ARRAY[String]],每一行RDD将存储以下变量的数组:
EdgeId sourceID destID score sourceNAme destNAme distance

请给我任何参考或提示。

2个回答

9

由于洗牌操作,排序即使不进行收集也会变得昂贵,但您可以使用sortBy方法:

import scala.util.Random

val data = Seq.fill(10)(Array.fill(3)("") :+ Random.nextInt.toString)
val rdd  = sc.parallelize(data)

val sorted = rdd.sortBy(_.apply(3).toInt)
sorted.take(3)
// Array[Array[String]] = Array(
//   Array("", "", "", -1660860558),
//   Array("", "", "", -1643214719),
//   Array("", "", "", -1206834289))

如果你只对顶部结果感兴趣,那么通常会优先选择toptakeOrdered

import scala.math.Ordering

rdd.takeOrdered(2)(Ordering.by[Array[String], Int](_.apply(3).toInt))
// Array[Array[String]] = 
//   Array(Array("", "", "", -1660860558), Array("", "", "", -1643214719))

rdd.top(2)(Ordering.by[Array[String], Int](_.apply(3).toInt))
// Array[Array[String]] = 
//   Array(Array("", "", "", 1920955686), Array("", "", "", 1597012602))

但是我有一个问题.. 我需要将RDD转换为数组。我该怎么做?因为我的剩余代码依赖于数组。 - Sandip Armal Patil
1
如果你想要所有的数据,唯一的方法就是收集。toptakeOrdered已经返回了一个数组。 - zero323
我可以使用像 rdd.top(rdd.count())(Ordering.by[Array[String], Int](_.apply(3).toInt)) 这样的代码吗?这样我就可以获取所有数据。 - Sandip Armal Patil
好的.. 但是sortBy之后跟着collect会返回数组吗?或者如何操作? - Sandip Armal Patil
1
是的,它会返回一个数组。 - zero323
显示剩余3条评论

3

RDD中有一个sortBy方法(参见文档)。您可以这样做:

scoreTriplets.sortBy( _(3).toInt )

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接