为什么这段Spark代码会抛出java.io.NotSerializableException

3

我想在RDD的转换中访问一个伴生对象的方法。为什么以下代码无法正常工作:

import org.apache.spark.rdd.RDD
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}

class Abc {
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}

object Abc {
  def fn(x: Int): Double = { x.toDouble }
}

implicit def abcEncoder: Encoder[Abc] = Encoders.kryo[Abc]

new Abc().transform(sc.parallelize(1 to 10)).collect

上面的代码会抛出一个java.io.NotSerializableException异常:
org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.map(RDD.scala:369)
  at Abc.transform(<console>:19)
  ... 47 elided
Caused by: java.io.NotSerializableException: Abc
Serialization stack:
        - object not serializable (class: Abc, value: Abc@4f598dfb)
        - field (class: Abc$$anonfun$transform$1, name: $outer, type: class Abc)
        - object (class Abc$$anonfun$transform$1, <function1>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
  ... 57 more

即使为Abc类定义了一个Encoder也无济于事。但更重要的问题是,为什么会尝试序列化Abc类的对象?我的第一个想法是,伴生对象是该类的单例对象,因此可能存在序列化它的尝试。但这似乎不是情况,因为当我从另一个类调用Abc.fn时:

class Xyz {
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}

implicit def xyzEncoder: Encoder[Xyz] = Encoders.kryo[Xyz]

new Xyz().transform(sc.parallelize(1 to 10)).collect

我得到了一个java.io.NotSerializableException:Xyz。

2
工作不会在边缘节点上发生;类(或对象)必须被序列化,以便数据节点可以运行它。 - Elliott Frisch
因为您实际上没有定义序列化/反序列化函数,更不用说实现正确的接口了。(https://docs.oracle.com/javase/7/docs/api/java/io/NotSerializableException.html) 默认情况下,序列化只能访问公共设置和可获取的内容。对于任何超出此范围的内容,您需要提供自己的函数。 - Christopher
2个回答

3
以下是该文章的翻译:

这是一篇关于Apache Spark中"可序列化"与"不可序列化"对象之间区别的优秀文章:

在Apache Spark中使用不可序列化对象,作者Nicola Ferraro

该文章提供了以下几点建议:

  • 针对您的特定情况正在发生什么

  • 一些替代方案,以便您的对象无需“可序列化”


2

Spark的主要抽象是RDD,它们被分区在集群的节点上。因此,当我们运行RDD时,它会被序列化到驱动程序节点并分发到其他适当的节点。然后工作节点对其进行反序列化并执行。

在您的情况下,类ABC无法被序列化并分发到其他工作节点。您需要使用Serializable将Class ABC序列化。

class Abc with Serializable{
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接