Spark中排序时出现NotSerializableException错误

7
我正在尝试编写一个简单的流处理Spark作业,该作业将获取一组消息(格式为JSON),每个消息属于一个用户,统计每个用户的消息数量并打印前十个用户。但是,当我定义Comparator >以对减少的计数进行排序时,整个过程都会失败,并抛出java.io.NotSerializableException异常。我的Spark Maven依赖项:
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.9.3</artifactId>
<version>0.8.0-incubating</version>

我使用的Java代码是:
public static void main(String[] args) {

    JavaSparkContext sc = new JavaSparkContext("local", "spark");

    JavaRDD<String> lines = sc.textFile("stream.sample.txt").cache();

    JavaPairRDD<String, Long> words = lines
        .map(new Function<String, JsonElement>() {
            // parse line into JSON
            @Override
            public JsonElement call(String t) throws Exception {
                return (new JsonParser()).parse(t);
            }

        }).map(new Function<JsonElement, String>() {
            // read User ID from JSON
            @Override
            public String call(JsonElement json) throws Exception {
                return json.getAsJsonObject().get("userId").toString();
            }

        }).map(new PairFunction<String, String, Long>() {
            // count each line 
            @Override
            public Tuple2<String, Long> call(String arg0) throws Exception {
                return new Tuple2(arg0, 1L);
            }

        }).reduceByKey(new Function2<Long, Long, Long>() {
            // count messages for every user
            @Override
            public Long call(Long arg0, Long arg1) throws Exception {
                return arg0 + arg1;
            }

        });

    // sort result in a descending order and take 10 users with highest message count
    // This causes the exception
    List<Tuple2<String, Long>> sorted = words.takeOrdered(10, new Comparator<Tuple2<String, Long>> (){

        @Override
        public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
            return -1 * o1._2().compareTo(o2._2());
        }

    });

    // print result
    for (Tuple2<String, Long> tuple : sorted) {
        System.out.println(tuple._1() + ": " + tuple._2());
    }

}

产生的堆栈跟踪:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
    at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: net.imagini.spark.test.App$5
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:670)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:668)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:668)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

我查看了Spark API文档,但没有找到任何可以指导我正确方向的内容。这是我的问题还是Spark中存在的一个bug? 非常感谢您的帮助。

1
更新:显然,这一切都归结于传递给*takeOrdered()*的第二个参数——Comparator对象。由于Comparator接口没有扩展Serializable,为了使其工作,您需要创建一个“可序列化”的比较器:`public interface SerializableComparator extends Comparator, Serializable { }`随后,将实现该接口的对象作为比较器传递可以防止原始异常。当然,这可能不是解决此问题最优雅的方式,我肯定会欢迎一些建议 :) - vanco.anton
1个回答

5

正如 @vanco.anton 提到的,您可以使用 Java 8 函数式接口来执行以下操作:

public interface SerializableComparator<T> extends Comparator<T>, Serializable {

  static <T> SerializableComparator<T> serialize(SerializableComparator<T> comparator) {
    return comparator;
  }

}

然后在您的代码中:

import static SerializableComparator.serialize;
...
rdd.top(10, serialize((a, b) -> -a._2.compareTo(b._2)));

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接