通常应该在调用collect()之前进行排序,因为这会将数据集返回给驱动程序,并且这也是以Java编程方式编写Hadoop MapReduce作业的方式,以便您想要的最终输出被写入(通常)到HDFS。使用Spark API,这种方法提供了在所需位置以“原始”形式编写输出的灵活性,例如将其写入文件,以便可以将其用作进一步处理的输入。
使用Spark的Scala API,在调用collect()之前进行排序可以按照eliasah的建议并使用Tuple2.swap()两次来完成,一次在排序之前,一次在排序之后,以产生按其第二个字段(命名为_2)递增或递减排序的元组列表,并包含其第一个字段(命名为_1)中单词数量的计数。以下是在spark-shell中如何编写此脚本的示例:
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _, 1)
.map(item => item.swap)
.sortByKey(true, 1)
.map(item => item.swap)
为了反转排序顺序,请使用sortByKey(false,1)。它的第一个参数是升降序的布尔值,第二个参数是任务数(等同于分区数),对于只需要一个输出数据文件进行测试的小型输入文件,将其设置为1。reduceByKey也接受此可选参数。
之后,可以使用saveAsTextFile(directory_pathname)将wordCounts RDD保存为文本文件到目录中,其中将存放一个或多个part-xxxxx文件(以part-00000开头),根据作业配置的reducer数量来决定输出数据文件数量(每个reducer一个输出数据文件),还会有一个_SUCCESS文件表示作业是否成功,以及.crc文件。
使用pyspark,与上面显示的scala脚本非常相似的python脚本将产生基本相同的输出。这里是演示按值对集合进行排序的pyspark版本。
file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b, 1) \
.map(lambda (a, b): (b, a)) \
.sortByKey(1, 1) \
.map(lambda (a, b): (b, a))
为了按降序排序,sortbyKey的第一个参数应该是0。由于Python将前导和尾随空格作为数据捕获,因此在每行空格分割之前插入strip(),但在使用spark-shell/scala时不需要这样做。
Spark和Python版本的wordCount输出的主要区别在于,Spark输出(word,3),而Python输出(u'word', 3)。
有关Spark RDD方法的更多信息,请参见
http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html(Python)和
https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD(Scala)。
在spark-shell中,对wordCounts运行collect()将其从RDD转换为Array[(String, Int)] = Array[Tuple2(String,Int)],可以使用以下方式对每个Tuple2元素的第二个字段进行排序:
Array.sortBy(_._2)
sortBy也可以接受一个可选的隐式math.Ordering参数,就像Romeo Kienzler在此问题的先前答案中所展示的那样。Array.sortBy(_._2)将通过在运行map-reduce脚本之前定义一个隐式反向排序来对其_2字段上的Array Tuple2元素进行反向排序,因为它会覆盖Int的现有排序。 Romeo Kienzler已经定义的反向int Ordering是:
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}
另一种定义这种反向排序的常见方法是颠倒 a 和 b 的顺序,并在比较定义的右侧去掉 (-1)。
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = b.compare(a)
}
textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b).map(item => item.swap).sortByKey()
? - eliasahmap(lambda (x,y):(y,x))
而不是map(item=>item.swap)
。请注意,在翻译过程中,我已经尽力使语言简洁易懂,但并未更改原意。 - Nick ChammassortByKey(0)
来进行降序排序。 - hamed