我有一段Spark代码,在Spark 1.3上运行正常,但当我将其移植到Spark 1.5.2时失败了(集群升级超出我的控制)。失败的信息如下:
Caused by: java.io.NotSerializableException: com.location.model.Profile
Serialization stack:
- object not serializable (class: com.location.model.Profile, value: com.location.model.Profile@596032b0)
- field (class: org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1, name: zeroValue$3, type: class java.lang.Object)
- object (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1, <function0>)
- field (class: org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1$$anonfun$1, name: $outer, type: class org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1)
- object (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1$$anonfun$1, <function0>)
- field (class: org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1$$anonfun$apply$10, name: createZero$1, type: interface scala.Function0)
- object (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1$$anonfun$apply$10, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
有趣的一点是,手头的这个类
Profile
被声明为class Profile() extends KryoSerializable
并覆盖了该接口的读/写方法。我还将此配置设置为Spark-submit:
"--conf" -> "'spark.serializer=org.apache.spark.serializer.KryoSerializer'"
,并通过conf.registerKryoClasses(Array(classOf[Profile], ...
注册了Profile
类。所以一切都按照Spark Tunning guide中的说明进行操作,之前运行得很好。 请注意,异常显示
ClosureCleaner
正在使用JavaSerializerInstance
,如果我将Profile
类添加extends Serializable
,它确实可以工作。但我不确定为什么它在使用那个序列化程序,也不知道为什么我需要与Java Serialization兼容,如果我明确要求使用Kryo。
编辑:我甚至完全删除了参数,因为在
registerKryoClasses
下的代码无论如何都会设置该属性。实际上,我怀疑正在使用Kryo序列化(我在write
内部添加了println并出现了,但某种先前的验证不正确)。
registerKryoClasses
期望注册一个类数组。Profile
只是这些类中的第一个。你提到的看起来像是classOf [Array [Profile]]
。不过还是感谢你的帮助 :-) - Daniel Langdon.aggregateByKey(new Profile(), 3200)
,我只是提供了一个零值 :-( - Daniel Langdon