理解Spark的闭包及其序列化

19

免责声明:刚开始尝试使用Spark。

我在理解著名的“任务不可串行化”异常上遇到了麻烦,但我的问题与我在SO(或者我认为如此)看到的问题有些不同。

我有一个微小的自定义RDD(TestRDD)。它有一个字段用于存储那些未实现Serializable接口的对象(NonSerializable类)。我已将“spark.serializer”配置选项设置为使用Kryo。然而,当我尝试在我的RDD上使用count()函数时,我得到以下结果:

Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: com.test.spark.NonSerializable@2901e052)
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
当我查看DAGScheduler.submitMissingTasks的内部时,我发现它在我的RDD上使用了它的闭包序列化程序,这是Java序列化程序,而不是我所期望的Kryo序列化程序。我已经阅读过 Kryo 在序列化闭包方面存在问题,并且Spark总是对闭包使用Java序列化器,但我并不完全理解闭包在这里的作用。我在这里做的只是:
SparkConf conf = new SparkConf()
                         .setAppName("ScanTest")
                         .setMaster("local")
                         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

JavaSparkContext sc = new JavaSparkContext(conf);

TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());

也就是说,没有映射器或任何需要序列化闭包的东西。 另一方面,这段代码是可行的:

sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()

使用Kryo序列化器,不涉及闭包序列化器。如果我没有将序列化器属性设置为Kryo,我也会在这里遇到异常。

我很感激任何指向解释闭包的来源以及如何确保我可以使用Kryo来序列化自定义RDD的指针。

更新:这是具有其非可序列化字段mNS的TestRDD:

class TestRDD extends RDD<String> {

    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);

    NonSerializable mNS = new NonSerializable();

    public TestRDD(final SparkContext _sc) {
        super(_sc,
              JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
              STRING_TAG);
    }

    @Override
    public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
        return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
                                                                     "test_" + thePartition.index(),
                                                                     "test_" + thePartition.index()).iterator()).asScala();
    }

    @Override
    public Partition[] getPartitions() {
        return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
    }

    static class TestPartition implements Partition {

        final int mIndex;

        public TestPartition(final int theIndex) {
            mIndex = theIndex;
        }

        public int index() {
            return mIndex;
        }
    }
}

你的TestRDD中是否有一个保存SparkContext的字段?请展示你的TestRDD定义或创建一个[MCVE]。 - Yuval Itzchakov
@YuvalItzchakov 在这里。SparkContext被传递到super的构造函数中,所以是的,RDD确实持有它。但异常似乎并没有抱怨那个。 - Pavel Klinov
你能发布“NonSerializable”吗? - Yuval Itzchakov
就像 class NonSerializable {} 一样简单。 - Pavel Klinov
1个回答

11
当我查看 DAGScheduler.submitMissingTasks 的内部时,我发现它使用了闭包序列化器对我的 RDD 进行序列化,其中使用的是Java序列化器,而不是我期望的Kryo序列化器。 SparkEnv 支持两个序列化器,一个名为 serializer,用于数据序列化、检查点、工作节点之间的消息传递等,可在 spark.serializer 配置标志下使用。另一个称为 closureSerializer,在 Spark <= 1.6.2 中可配置(但除 JavaSerializer 外其他都无效),用于检查对象是否可序列化。自 Spark 2.0.0 起,该序列化器被硬编码为 JavaSerializer。
Kryo 闭包序列化器存在错误,因此不能使用,可以在 SPARK-7708 中查看此错误(可能已在 Kryo 3.0.0 中修复,但 Spark 目前固定在 Chill 的特定版本上,该版本固定在 Kryo 2.2.1 上)。对于 Spark 2.0.x,JavaSerializer 现已被固定,而非可配置(请参见 此拉取请求)。这意味着实际上我们只能使用 JavaSerializer 进行闭包序列化。
我们在提交任务和在工作节点之间传递数据时使用不同的序列化器是否奇怪?当然是,但这就是我们拥有的东西。
总而言之,如果您设置了 spark.serializer 配置或使用 SparkContext.registerKryoClasses,则大多数情况下会使用 Kryo 进行序列化。尽管如此,在检查给定类是否可序列化以及将任务序列化到工作节点时,Spark 将使用 JavaSerializer。

谢谢,但是它为什么不正确呢?我可以看到DAGScheduler使用的是closureSerializer字段,而不是serializer字段。无论我是否将环境设置为使用Kryo,SparkEnv.get.closureSerializer始终是Java序列化程序(如果我没记错的话,他们甚至从2.0中删除了选项spark.closure.serializer,因为它被忽略了),所以我知道为什么会失败。问题是不同的:为什么调度程序在我的情况下使用闭包序列化程序?我如何让它为我的RDD>使用Kryo? - Pavel Klinov
嗯,我相信这个语句对于Spark 2.0.0和2.0.1是正确的(从堆栈跟踪中可以看出)。您还可以检查SPARK-12414。closureSerializer可能是一个抽象类型,但据我所知只使用了一种实现。 - Pavel Klinov
@PavelKlinov 你说得对。我深入研究了一下,看看我的更新。 - Yuval Itzchakov
@PavelKlinov 如果Spark甚至没有达到Kryo序列化它们的点,那还有什么意义呢?Kryo用于序列化,只是没有用于检查对象是否有效地可序列化。我同意这是一个怪癖。如果我们正在使用Kryo在工作节点之间序列化一个类,那么为什么要根据JavaSerializer规则进行验证呢? - Yuval Itzchakov
2
如果您有任何不可序列化的属性,常见做法是将其标记为@transient,并让工作线程懒加载它。 - Yuval Itzchakov
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接