在JavaRDD中的Sortby

4
我正在使用Java编写Spark程序。我想对我的Map进行排序。实际上,我有一个JavaRDD,像这样:
JavaPairRDD<String, Integer> rebondCountURL = session_rebond_2.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() {
        @Override
        public Tuple2<String, String> call(Tuple2<String, String> stringStringTuple2) throws Exception {
            return new Tuple2<String, String>(stringStringTuple2._2, stringStringTuple2._1);
        }
    }).groupByKey().map(new PairFunction<Tuple2<String, Iterable<String>>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> call(Tuple2<String, Iterable<String>> stringIterableTuple2) throws Exception {
            Iterable<String> strings = stringIterableTuple2._2;
            List<String> b = new ArrayList<String>();
            for (String s : strings) {
                b.add(s);
            }
            return new Tuple2<String, Integer>(stringIterableTuple2._1, b.size());
        }
    });

我想使用Sortby对这个Java Rdd进行排序(按整数排序)。你能帮我做吗?
谢谢您的帮助。
3个回答

14
您需要创建一个函数,从每个元素中提取排序键。这是我们代码的示例。
final JavaRDD<Something> stage2 = stage1.sortBy( new Function<Something, Long>() {
  private static final long serialVersionUID = 1L;

  @Override
  public Long call( Something value ) throws Exception {
    return value.getTime();
  }
}, true, 1 );

这对我也起作用了。对于像我这样的初学者,上面的函数是按时间排序的。只想知道分区数量如何影响排序? - Vignesh Iyer
该参数控制有多少并行任务将执行排序。下一个处理级别将以此分区数量作为输入。例如,如果将此参数设置为 10 并且下一个处理级别将 RDD 保存到文件中,则会创建 10 个输出文件。 - erankl
增加分区数量会使用sortKey创建分区,但不会影响全局排序顺序吗? - darkknight444
@Notinlist stage1 是一个 Scala 中的 RDD 对象吗?因为我在 JavaRDD API 中没有找到 sortBy 方法。 - CᴴᴀZ
@CᴴᴀZ 可能我当时在谈论的是1.1.0版本(发布于2014年9月11日),但我不太确定。https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/api/java/JavaRDD.html#sortBy(org.apache.spark.api.java.function.Function,%20boolean,%20int) - Notinlist
完成后,在Java8中,我认为它应该是这样的:stage2 = stage1.sortBy((Something value) -> value.getTime() , true, 1); - vefthym

0

关于sortBy()的提示.. 如果你想对一组用户定义的对象进行排序,比如Point,则在Point类中实现Comparable<Point>接口,并重写compareTo()方法,在其中编写自己的排序逻辑。之后,sortby函数将处理排序逻辑。

注意:你的Point类还必须实现java.io.Serializable接口,否则你将遇到NotSerializable异常。


0

这是基于 @Vignesh 建议的代码。您可以使用任何自定义的 ComparatorsortBy。将比较器单独编写并在 Spark 代码中使用引用更加清晰:

 rdd ->{JavaRDD<MaxProfitDto> result = 
        rdd.keyBy(Recommendations.profitAsKey)
        .sortByKey(new CryptoVolumeComparator())
        .values()

所以,比较器看起来像下面这样:

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Comparator;

import models.CryptoDto;
import scala.Tuple2;

public class CryptoVolumeComparator implements Comparator<Tuple2<BigDecimal, CryptoDto>>, Serializable {
    private static final long serialVersionUID = 1L;
    @Override
    public int compare(Tuple2<BigDecimal, CryptoDto> v1, Tuple2<BigDecimal, CryptoDto> v2) {
        return  v2._1().compareTo(v1._1());
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接