Spark-->java.lang.ClassCastException: 无法分配 java.lang.invoke.SerializedLambda 的实例

6

当我启动计算每个键的平均值的应用程序时,出现了以下错误。我使用带有lambda表达式(Java8)的函数combineBykey。 我读取一个包含三个记录(keytimefloat)的文件。工作者和主节点都使用Java 8。

 16/05/06 15:48:23 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at ProcesarFichero.java:115) failed in 3.774 s
    16/05/06 15:48:23 INFO DAGScheduler: Job 0 failed: saveAsTextFile at ProcesarFichero.java:153, took 3.950483 s
    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 5, mcava-slave0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1
            at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
            at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
            at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
            at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:64)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
            at org.apache.spark.scheduler.Task.run(Task.scala:89)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    Driver stacktrace:
            at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
            at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
            at scala.Option.foreach(Option.scala:236)
            at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
            at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
            at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1213)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
            at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1156)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
            at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:952)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
            at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:951)
            at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1443)
            at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1422)
            at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1422)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
            at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1422)
            at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:507)
            at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:46)
            at com.baitic.mcava.spark.ProcesarFichero.main(ProcesarFichero.java:153)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
            at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
            at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1 of type org.apache.spark.api.java.function.Function in inst
    ance of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1
            at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
            at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
            at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
            at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:64)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
            at org.apache.spark.scheduler.Task.run(Task.scala:89)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

这是抛出异常的代码:

  AvgCount initial = new AvgCount(0.0F, 0);
    JavaPairRDD<String, AvgCount> avgCounts
            = pairs.combineByKey((Float x) -> new AvgCount(x, 1), (AvgCount a, Float x) -> new AvgCount(a.total_+x,a.num_+1), 
                    (AvgCount a, AvgCount b) ->new AvgCount(a.total_+b.total_,a.num_+b.num_));
    avgCounts.saveAsTextFile("hdfs://mcava-master:54310/srv/hadoop/data/spark/xxmedidasSensorca");
    }

 public static class AvgCount implements Serializable {
        public AvgCount(Float total, int num) {
            total_ = total;
            num_ = num;
        }
        public Float total_;
        public int num_;
        public float avg() {
            return total_ / (float) num_;
        }
    }

我使用 conf.setjars() 函数来分发包含所有依赖项的 fat jar。


1
我遇到了相同的异常,并通过为Spark配置提供setJar()方法的fat jar来解决了这个问题。你是如何配置Spark主节点属性的?此外,参见这篇非常好的关于该问题的回答。 - Philip Raschke
我通常在Java代码中使用配置上下文对象(.setjar(PATH))来配置此问题,或者另一种选择是在启动提交脚本时编写路径(SPARKHOME/bin/submit .... --jar PATH)。对于我的英语表示抱歉。 - Miren
那么这应该是可以的。另一个陷阱是Spark和Scala之间的版本不匹配。你使用哪些版本? - Philip Raschke
1个回答

4

我使用了 .setJars 方法来设置 SparkConf,它起作用了。但要确保 jar 文件的路径是正确的。我曾经为了修复它而挣扎,因为 jar 文件的路径不正确,最终当我从系统属性中调试获取 user.dir 后,我才能够修复路径并让解决方案起效。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接