首先按值对JavaPairRDD进行排序,然后再按键进行排序。

12
我是一名有用的助手,可以为您翻译文本。

我正在尝试按值对RDD进行排序,如果多个值相等,则需要按字典顺序将这些值按键排序。

代码:

JavaPairRDD <String,Long> rddToSort = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () {

    @Override
    public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
        return new Tuple2 < String, Long > (t._1, t._2.count);
    }
});

目前我所做的是使用takeOrdered并提供一个CustomComperator,但由于takeOrdered无法处理大量数据,运行代码时它会不断退出(它会消耗大量操作系统无法处理的内存):

List < Tuple2 < String, Long >> rddSorted = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () {

    @Override
    public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
        return new Tuple2 < String, Long > (t._1, t._2.count);
    }
}).takeOrdered(newTopMovies, MapLongValueComparator.VALUE_COMP);

比较器:

    static class MapLongValueComparator implements Comparator < Tuple2 < String, Long >> , Serializable {
        private static final long serialVersionUID = 1L;

        private static final MapLongValueComparator VALUE_COMP = new MapLongValueComparator();

        @Override
        public int compare(Tuple2 < String, Long > o1, Tuple2 < String, Long > o2) {
            if (o1._2.compareTo(o2._2) == 0) {
                return o1._1.compareTo(o2._1);
            }
            return -o1._2.compareTo(o2._2);
        }
}

错误:

16/06/30 21:09:23 INFO scheduler.DAGScheduler: Job 18 failed: takeOrdered at MovieAnalyzer.java:708, took 418.149182 s

你会如何对这个RDD进行排序?在考虑值的情况下,如何获取 TopKMovies ,并且在键相等的情况下按字典顺序排序。谢谢。

你能提供堆栈跟踪吗(如果有的话)?因为你提到可能是内存问题,但错误消息并不允许看到发生了什么。 - Serhiy
@Serhiy 我猜这是一个内存问题,因为takeOrdered操作需要很长时间,这是因为它在分布式模式下处理了大量数据,我也得到了Exit code: 137和Exit code: 1。以其他方式接近排序肯定会解决问题。 - Jay
你尝试过重新分区数据吗?当你映射到pair时,可以紧接着重新分区。 - Serhiy
我没有这样做,我该怎么做?那有什么帮助呢?比如在应用takeOrdered之前对数据进行分区? - Jay
1
分区将会把你的数据分成多个部分,这样机器在有限制的情况下就可以处理部分而不是整个数据,因为在某个时间点,整个数据将无法适合机器内存(不要忘记 Spark 是内存计算)。在将值映射到一对后,您可以调用 partitionBy 方法。您需要通过实现一个分区器来进行分区,非常简单的情况下,可以根据字符串的第一个字母进行分区(我猜这是一个电影名?)。如果仍然出现 OOM,您可能需要进一步尝试分区。 - Serhiy
1
它可能解决takeOrdered在大量数据上的超时问题,但这并不会正确地对数据进行排序! - Jay
2个回答

3

通过使用comparator和partitions来使用sortByKey解决了这个问题,之前将 <String, Long> PairRDD 映射到了< Tuple2<String,Long> , Long> PairRDD。

JavaPairRDD <Tuple2<String,Long>, Long> sortedRdd = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , Tuple2<String,Long>, Long > () {

    @Override
    public Tuple2 < Tuple2<String,Long>, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
        return new Tuple2 < Tuple2<String,Long>, Long > (new Tuple2<String,Long>(t._1,t._2.count), t._2.count);
    }
}).sortByKey(new TupleMapLongComparator(), true, 100);


JavaPairRDD <String,Long> sortedRddToPairs = sortedRdd.mapToPair(new PairFunction<Tuple2<Tuple2<String,Long>,Long>, String, Long>() {

    @Override
    public Tuple2<String, Long> call(
            Tuple2<Tuple2<String, Long>, Long> t) throws Exception {
        return new Tuple2 < String, Long > (t._1._1, t._1._2);
    }

});

比较器:

private class TupleMapLongComparator implements Comparator<Tuple2<String,Long>>, Serializable {
    @Override
    public int compare(Tuple2<String,Long> tuple1, Tuple2<String,Long> tuple2) {

        if (tuple1._2.compareTo(tuple2._2) == 0) {
            return tuple1._1.compareTo(tuple2._1);
        }
        return -tuple1._2.compareTo(tuple2._2);
    }
}

1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接