Spark按值排序获取集合

39

我正在尝试这个教程 http://spark.apache.org/docs/latest/quick-start.html 我首先从一个文件创建了一个集合

textFile = sc.textFile("README.md")

然后我尝试了一个命令来统计单词数:

wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

打印集合:

 wordCounts.collect()

我发现可以使用sortByKey命令按单词对其进行排序。我想知道如何按值排序,即在此情况下,按单词在文档中出现的次数进行排序。


7
你能尝试类似以下的代码吗:textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b).map(item => item.swap).sortByKey() - eliasah
10
@user3702916 - eliasah的解决方案适用于Scala API。将其转换为Python即可运行。因此,您可以尝试使用map(lambda (x,y):(y,x))而不是map(item=>item.swap)。请注意,在翻译过程中,我已经尽力使语言简洁易懂,但并未更改原意。 - Nick Chammas
我们如何在Java API中实现这个? - Rahman Usta
...reduceByKey(lambda a, b: a+b).map(lambda x: (x[1],x[0])).sortByKey() 或者在 Python 中使用 sortByKey(0) 来进行降序排序。 - hamed
11个回答

32

通常应该在调用collect()之前进行排序,因为这会将数据集返回给驱动程序,并且这也是以Java编程方式编写Hadoop MapReduce作业的方式,以便您想要的最终输出被写入(通常)到HDFS。使用Spark API,这种方法提供了在所需位置以“原始”形式编写输出的灵活性,例如将其写入文件,以便可以将其用作进一步处理的输入。

使用Spark的Scala API,在调用collect()之前进行排序可以按照eliasah的建议并使用Tuple2.swap()两次来完成,一次在排序之前,一次在排序之后,以产生按其第二个字段(命名为_2)递增或递减排序的元组列表,并包含其第一个字段(命名为_1)中单词数量的计数。以下是在spark-shell中如何编写此脚本的示例:

// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _, 1)  // 2nd arg configures one task (same as number of partitions)
  .map(item => item.swap) // interchanges position of entries in each tuple
  .sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
  .map(item => item.swap)

为了反转排序顺序,请使用sortByKey(false,1)。它的第一个参数是升降序的布尔值,第二个参数是任务数(等同于分区数),对于只需要一个输出数据文件进行测试的小型输入文件,将其设置为1。reduceByKey也接受此可选参数。
之后,可以使用saveAsTextFile(directory_pathname)将wordCounts RDD保存为文本文件到目录中,其中将存放一个或多个part-xxxxx文件(以part-00000开头),根据作业配置的reducer数量来决定输出数据文件数量(每个reducer一个输出数据文件),还会有一个_SUCCESS文件表示作业是否成功,以及.crc文件。
使用pyspark,与上面显示的scala脚本非常相似的python脚本将产生基本相同的输出。这里是演示按值对集合进行排序的pyspark版本。
file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task
    .map(lambda (a, b): (b, a)) \
    .sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task
    .map(lambda (a, b): (b, a))

为了按降序排序,sortbyKey的第一个参数应该是0。由于Python将前导和尾随空格作为数据捕获,因此在每行空格分割之前插入strip(),但在使用spark-shell/scala时不需要这样做。
Spark和Python版本的wordCount输出的主要区别在于,Spark输出(word,3),而Python输出(u'word', 3)。
有关Spark RDD方法的更多信息,请参见http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html(Python)和https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD(Scala)。
在spark-shell中,对wordCounts运行collect()将其从RDD转换为Array[(String, Int)] = Array[Tuple2(String,Int)],可以使用以下方式对每个Tuple2元素的第二个字段进行排序:
Array.sortBy(_._2) 

sortBy也可以接受一个可选的隐式math.Ordering参数,就像Romeo Kienzler在此问题的先前答案中所展示的那样。Array.sortBy(_._2)将通过在运行map-reduce脚本之前定义一个隐式反向排序来对其_2字段上的Array Tuple2元素进行反向排序,因为它会覆盖Int的现有排序。 Romeo Kienzler已经定义的反向int Ordering是:

// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}

另一种定义这种反向排序的常见方法是颠倒 a 和 b 的顺序,并在比较定义的右侧去掉 (-1)。
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = b.compare(a)
}   

这个解决方案对Spark Streaming也适用吗?我的意思是,它能够跨批次对单词进行排序吗? - Amit Kumar

20

用更符合 Python 风格的方式实现。

# In descending order
''' The first parameter tells number of elements
    to be present in output.
''' 
data.takeOrdered(10, key=lambda x: -x[1])
# In Ascending order
data.takeOrdered(10, key=lambda x: x[1])

7

对于那些想要按值排序并获取前N个元素的人:

theRDD.takeOrdered(N, lambda (key, value): -1 * len(value))

如果您希望按字符串长度进行排序。

另一方面,如果值已经以适合您所需的顺序形式存在,则:

theRDD.takeOrdered(N, lambda (key, value): -1 * value)

足以满足。


5

你可以这样做

// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
    override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}

counts.collect.toSeq.sortBy(_._2)

基本上,您需要将RDD转换为序列,然后使用sort方法对其进行排序。

上面的代码块全局更改了排序行为,以获取降序排序。


5
我认为你可以使用通用的sortBy转换(不是一个动作,即它返回一个RDD而不是一个数组),文档在此处有所介绍。因此,在你的情况下,你可以执行以下操作:
wordCounts.sortBy(lambda (word, count): count)

3

按值对输出进行排序的最简单方法是,在reduceByKey之后,您可以交换输出,将键作为值,将值作为键,然后可以使用sortByKey方法,其中false表示按降序排序。默认情况下,它会按升序排序。

 val test=textFile.flatMap(line=> line.split(" ")).map(word=> (word, 1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(false)

2

@kef提供的Python解决方案非常准确...

接下来需要更改的是-

.map(lambda (a, b): (b, a))

to

.map(lambda a: (a[1], a[0]))

1
 wordCounts.map(lambda (a,b) : (b,a)).sortByKey(ascending=False).map(lambda (a,b) : (b,a)).collect()

这个解决方案是有效的,因为wordCount rdd的每一行看起来像这样:
(WORD,COUNT)
第一个map生成了一个元组顺序相反的rdd,即现在它们看起来像这样
(COUNT,WORD)
现在当我们使用sortByKey时,COUNT被视为键,这正是我们想要的。 然后第二个map将现在排序的第二个rdd映射回原始格式
(WORD,COUNT)
对于每一行,但现在行按单词计数排序。
这里的一个隐含假设是,映射不会改变RDD行的顺序,否则第二个map可能会影响排序。

3
这个回答需要一些解释... 你不能只是贴上一些代码:http://stackoverflow.com/help/how-to-answer - ted
1
仅仅发布一个方程式并不是很有帮助,除非你解释它的作用。 - daphtdazz

1

使用SCALA进行sortByValue的更好方法是:

val count = oozie.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).sortBy(x => x._2)

x._2代表列表x的第二个元素。

要按降序排序,请使用“-x._2”。

scala> val count = oozie.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).sortBy(x => -x._2)

count: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[26] at sortBy at <console>:25

scala> count.take(10)
res6: Array[(String, Int)] = Array((the,4603), (to,1707), (and,1595), (of,1337), (a,1319), (Oozie,1302), (in,1131), (.,994), (is,956), (for,753))

1
我使用Python解决了它。因此,我创建了一个键值对列表,并按值排序:
out = wordCounts.collect()
outSort = sorted(out, key=lambda word:word[1])

9
您正在将所有结果收集到驱动程序并在那里进行排序。 这种方法可以工作,但仅适用于结果集相对较小的情况。 如果需要规模化的解决方案,请参见eliasah的解决方案。 - Nick Chammas
2
这并没有解决大数据的问题。如果数据很小,为什么还需要使用Spark呢? - Shirish Kumar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接