如何在Spark Java中按值进行排序

5
JavaPairRDD<String, Float> counts = ones
            .reduceByKey(new Function2<Float, Float, Float>() {
                @Override
                public Float call(Float i1, Float i2) {
                    return i1 + i2;
                }
            });

我的输出如下所示:
id,value
100002,23.47
100003,42.78
200003,50.45
190001,30.23

我希望输出按值排序,类似于:
200003,50.45
100003,42.78
190001,30.23
100002,23.47

我该如何实现这个目标?

Spark对Keys进行“操作”。如果您需要按“values”排序并且它是Key,那么请相应地映射RDD并使用sortByKey。 - CᴴᴀZ
2个回答

4

Scala有一个很好的sortBy方法。找不到Java的等效方法,但这是Scala的实现:

  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.size)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
    this.keyBy[K](f)
        .sortByKey(ascending, numPartitions)
        .values

基本上与上面类似,但它添加了一个键而不是交换前后位置。我使用它的方式是:.sortBy(_._2)(通过选择元组的第二个元素进行排序)。


1
有一个 sortBy 方法,但它是在 JavaRDD 上定义的,而不是 JavaPairRDD - Artem Fedosov
转换为 JavaRDD 就像 counts.map(t -> t) 一样简单,其余部分与 counts.map(t -> t).sortBy(t -> t._2, false, counts.getNumPartitions()) 相似。 - Artem Fedosov

3
我认为没有特定的API可以按值对数据进行排序。
也许您需要执行以下步骤:
1)交换键和值
2)使用sortByKey API
3)再次交换键和值
在下面的引用中查看有关sortByKey的更多详细信息:
https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/api/java/JavaPairRDD.html#sortByKey%28boolean%29 对于交换,我们可以使用Scala Tuple API:

http://www.scala-lang.org/api/current/index.html#scala.Tuple2

例如,我有来自以下函数的Java Pair RDD。
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
          @Override
          public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
          }
  });

现在,要交换键和值,您可以使用以下代码:
JavaPairRDD<Integer, String> swappedPair = counts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
           @Override
           public Tuple2<Integer, String> call(Tuple2<String, Integer> item) throws Exception {
               return item.swap();
           }

        });

希望这可以帮助到你。你需要注意数据类型。

Spark Java支持Scala的swap函数吗? - Subramanyam S
是的,您可以在Java中使用scala.Tuple2.swap() API。 - user1261215
我已经尝试过了,但是没有成功... 请问您能否提供如何使用swap的代码? - Subramanyam S
已更新我的回答,请核实。 - user1261215
请分享如何在交换后进行排序。 - Subramanyam S
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接