为什么Spark Dataset.map需要查询的所有部分都可以序列化?

3

我想使用Dataset.map函数来转换我的数据集中的行。示例如下:

val result = testRepository.readTable(db, tableName)
  .map(testInstance.doSomeOperation)
  .count()

其中testInstance是一个扩展了java.io.Serializable的类,但是testRepository并没有继承这个类。代码会抛出以下错误:

Job aborted due to stage failure.
Caused by: NotSerializableException: TestRepository

问题

我理解为什么 testInstance.doSomeOperation 需要序列化,因为它在 map 内部并将分发到 Spark worker。但是为什么需要对 testRepository 进行序列化呢?我不明白这对于 map 来说是必要的。将其定义更改为 class TestRepository extends java.io.Serializable 可以解决此问题,但在项目的更大上下文中并不理想。

是否有一种方法可以在不使 TestRepository 序列化的情况下使其正常工作,或者为什么需要进行序列化?

最小工作示例

以下是包含两个类代码的完整示例,可重现 NotSerializableException:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

case class MyTableSchema(id: String, key: String, value: Double)
val db = "temp_autodelete"
val tableName = "serialization_test"

class TestRepository extends java.io.Serializable {
  def readTable(database: String, tableName: String): Dataset[MyTableSchema] = {
    spark.table(f"$database.$tableName")
    .as[MyTableSchema]
  }
}

val testRepository = new TestRepository()

class TestClass() extends java.io.Serializable {
  def doSomeOperation(row: MyTableSchema): MyTableSchema = {
  row 
  }
}

val testInstance = new TestClass()

val result = testRepository.readTable(db, tableName)
  .map(testInstance.doSomeOperation)
  .count()

1
尝试使用 object TestRepository - botchniaque
感谢您的回复。这个例子可以运行,但在这个简化的例子之外是不实际的。我想要探究的主要问题是为什么它需要首先进行序列化?难道只有进入.map()的代码需要序列化吗? - RvdV
这是一个公正的问题,但我无法回答。抱歉。 - botchniaque
1个回答

0

原因是因为您的map操作正在从已经在执行器上发生的内容中读取。

如果您查看您的管道:

val result = testRepository.readTable(db, tableName)
  .map(testInstance.doSomeOperation)
  .count()

首先要做的是testRepository.readTable(db, tableName)。如果我们查看readTable方法的内部,我们会发现在其中进行了一个spark.table操作。如果我们从API文档中查看此方法的函数签名,我们会看到以下函数签名:

def table(tableName: String): DataFrame

这不仅仅是在驱动程序上进行的操作(想象一下读取一个>1TB文件,而只在驱动程序上进行操作),它还创建了一个Dataframe(本身就是分布式数据集)。这意味着testRepository.readTable(db, tableName)函数需要分布式,并且您的testRepository对象需要分布式。

希望这可以帮助您!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接