免责声明:刚开始尝试使用Spark。
我在理解著名的“任务不可串行化”异常上遇到了麻烦,但我的问题与我在SO(或者我认为如此)看到的问题有些不同。
我有一个微小的自定义RDD(TestRDD
)。它有一个字段用于存储那些未实现Serializable接口的对象(NonSerializable
类)。我已将“spark.serializer”配置选项设置为使用Kryo。然而,当我尝试在我的RDD上使用count()
函数时,我得到以下结果:
Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: com.test.spark.NonSerializable@2901e052)
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
当我查看DAGScheduler.submitMissingTasks
的内部时,我发现它在我的RDD上使用了它的闭包序列化程序,这是Java序列化程序,而不是我所期望的Kryo序列化程序。我已经阅读过 Kryo 在序列化闭包方面存在问题,并且Spark总是对闭包使用Java序列化器,但我并不完全理解闭包在这里的作用。我在这里做的只是:SparkConf conf = new SparkConf()
.setAppName("ScanTest")
.setMaster("local")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext sc = new JavaSparkContext(conf);
TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());
也就是说,没有映射器或任何需要序列化闭包的东西。 另一方面,这段代码是可行的:
sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()
使用Kryo序列化器,不涉及闭包序列化器。如果我没有将序列化器属性设置为Kryo,我也会在这里遇到异常。
我很感激任何指向解释闭包的来源以及如何确保我可以使用Kryo来序列化自定义RDD的指针。
更新:这是具有其非可序列化字段mNS的TestRDD:
class TestRDD extends RDD<String> {
private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);
NonSerializable mNS = new NonSerializable();
public TestRDD(final SparkContext _sc) {
super(_sc,
JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
STRING_TAG);
}
@Override
public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
"test_" + thePartition.index(),
"test_" + thePartition.index()).iterator()).asScala();
}
@Override
public Partition[] getPartitions() {
return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
}
static class TestPartition implements Partition {
final int mIndex;
public TestPartition(final int theIndex) {
mIndex = theIndex;
}
public int index() {
return mIndex;
}
}
}
TestRDD
中是否有一个保存SparkContext
的字段?请展示你的TestRDD
定义或创建一个[MCVE]。 - Yuval ItzchakovSparkContext
被传递到super的构造函数中,所以是的,RDD确实持有它。但异常似乎并没有抱怨那个。 - Pavel Klinovclass NonSerializable {}
一样简单。 - Pavel Klinov