理解Apache Spark RDD任务序列化

10

我正在尝试了解Spark中任务序列化的工作原理,但是我在编写测试代码时遇到了一些混乱的结果,感到有些困惑。

以下是我的一些测试代码(为了方便起见进行了简化),它在多个节点上执行以下操作:

object TestJob {
  def run(): Unit = {
    val rdd = ...
    val helperObject = new Helper() // Helper does NOT impl Serializable and is a vanilla class
    rdd.map(element => {
      helperObject.transform(element)
    }).collect()
  }
}

当我执行run()时,由于helperObject不可序列化,所以出现了“任务不可序列化”异常,这是预期的结果。然而,当我稍微改变一下它,像这样:

trait HelperComponent {
  val helperObject = new Helper()
}

object TestJob extends HelperComponent {
  def run(): Unit = {
    val rdd = ...
    rdd.map(element => {
      helperObject.transform(element)
    }).collect()
  }
}

工作以某种方式成功执行。有人可以帮助我理解这可能是为什么吗?在上述每种情况下,Spark序列化并发送给工作程序的内容是什么?
我正在使用Spark版本2.1.1。
谢谢!
1个回答

5
有人能帮我理解为什么会这样吗?
在你的第一个片段中,helperObject是在run内部声明的局部变量。因此,它将被函数封闭(提升),以便在代码执行的任何地方都可以使用所有信息,正因为如此,Sparks ClosureCleaner会因为你试图序列化它而发出警告。
在你的第二个片段中,该值不再是方法范围内的局部变量,而是类实例的一部分(从技术上讲,这是一个对象声明,但最终会转换为JVM类)。
这对于Spark来说具有意义,因为集群中的所有工作节点都包含执行代码所需的JAR文件。因此,当Spark在其中一个工作进程中启动一个Executor进程时,它将通过ClassLoader本地加载TestObject,并创建一个实例,就像非分布式应用程序中的每个JVM类一样。
总之,你不会看到这个问题爆炸是因为类不再序列化,这是由于你声明类型实例的方式发生了改变。

感谢您的回复,现在我理解了很多。为了进一步澄清,您能帮我理解执行程序运行的每个任务大小是多少吗?术语“任务”是指一个或多个RDD操作(map、filter等),还是指在数据集子集上运行所有RDD操作(即完整的“管道”)? - simonl
1
@simonl 任务是执行的最小单位。通常可以将每个操作(例如mapfilter等)视为单个任务。但实际上,Spark会将这些转换优化为单个任务执行。Spark如何知道哪些任务可以“压缩”在一起?它通常是针对我们称之为“窄转换”的操作进行这些操作,这些操作基本上是惰性函数,返回一个RDD,直到达到数据洗牌的“宽转换”。 - Yuval Itzchakov
@simonl,希望之前的评论有意义,有些东西在500个字符内难以解释清楚。 - Yuval Itzchakov
大部分是的,谢谢 - 你的评论让我找到了这个链接:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-transformations.html#narrow-transformations如果您有任何关于深入理解此级别的Spark的其他建议资源,我很乐意查看。 - simonl
@simonl 那是一个很好的学习资源。我的做法是深入研究Spark代码库,并查看控制工作分配的各种类。如果你想这样做,可以从查看“DAGScheduler”开始。此外,我推荐阅读《高性能Spark》一书。 - Yuval Itzchakov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接