我正在尝试了解Spark中任务序列化的工作原理,但是我在编写测试代码时遇到了一些混乱的结果,感到有些困惑。
以下是我的一些测试代码(为了方便起见进行了简化),它在多个节点上执行以下操作:
object TestJob {
def run(): Unit = {
val rdd = ...
val helperObject = new Helper() // Helper does NOT impl Serializable and is a vanilla class
rdd.map(element => {
helperObject.transform(element)
}).collect()
}
}
当我执行run()
时,由于helperObject
不可序列化,所以出现了“任务不可序列化”异常,这是预期的结果。然而,当我稍微改变一下它,像这样:
trait HelperComponent {
val helperObject = new Helper()
}
object TestJob extends HelperComponent {
def run(): Unit = {
val rdd = ...
rdd.map(element => {
helperObject.transform(element)
}).collect()
}
}
工作以某种方式成功执行。有人可以帮助我理解这可能是为什么吗?在上述每种情况下,Spark序列化并发送给工作程序的内容是什么?
我正在使用Spark版本2.1.1。
谢谢!
map
,filter
等)视为单个任务。但实际上,Spark会将这些转换优化为单个任务执行。Spark如何知道哪些任务可以“压缩”在一起?它通常是针对我们称之为“窄转换”的操作进行这些操作,这些操作基本上是惰性函数,返回一个RDD
,直到达到数据洗牌的“宽转换”。 - Yuval Itzchakov