Spark Scala: “Task Not serializable” 错误

7

我正在使用带有Scala插件和Spark库的IntelliJ社区版。 我仍在学习Spark并使用Scala工作表。

我编写了下面的代码,用于从字符串中删除标点符号:

def removePunctuation(text: String): String = {
  val punctPattern = "[^a-zA-Z0-9\\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase
}

然后我读取一个文本文件并尝试去除标点符号:

val myfile = sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)

这会产生以下错误,请帮忙解决:
org.apache.spark.SparkException: 任务无法序列化 在org.apache.spark.util.ClosureCleaner$.ensureSerializable(/home/ubuntu/src/main/scala/Test.sc:294) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(/home/ubuntu/src/main/scala/Test.sc:284) at org.apache.spark.util.ClosureCleaner$.clean(/home/ubuntu/src/main/scala/Test.sc:104) at org.apache.spark.SparkContext.clean(/home/ubuntu/src/main/scala/Test.sc:2090) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(/home/ubuntu/src/main/scala/Test.sc:366) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(/home/ubuntu/src/main/scala/Test.sc:365) at org.apache.spark.rdd.RDDOperationScope$.withScope(/home/ubuntu/src/main/scala/Test.sc:147) at #worksheet#.#worksheet#(/home/ubuntu/src/main/scala/Test.sc:108) 导致原因: java.io.NotSerializableException: A$A21$A$A21 序列化堆栈: - 对象不可序列化 (类:class A$A21$A$A21, 值:A$A21$A$A21@62db3891) - 字段 (类:class A$A21$A$A21$$anonfun$words$1, 名称:$outer, 类型:class A$A21$A$A21) - 对象 (类:A$A21$A$A21$$anonfun$words$1, ) 在org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala:369) at A$A21$A$A21.words$lzycompute(Test.sc:27) at A$A21$A$A21.words(Test.sc:27) at A$A21$A$A21.get$$instance$$words(Test.sc:27) at A$A21$.main(Test.sc:73) at A$A21.main(Test.sc) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.jetbrains.plugins.scala.worksheet.MyWorksheetRunner.main(MyWorksheetRunner.java:22)
3个回答

12
正如T. Gaweda已经指出的那样,您很可能是在一个不可序列化的类中定义了函数。因为它是一个纯函数,即它不依赖于封闭类的任何上下文,我建议您将其放入一个伴生对象中,该对象应扩展Serializable。这将是Scala等效于Java静态方法的方式:
object Helper extends Serializable {
  def removePunctuation(text: String): String = {
    val punctPattern = "[^a-zA-Z0-9\\s]".r
    punctPattern.replaceAllIn(text, "").toLowerCase
  }
}

6
如@TGaweda所建议,Spark的SerializationDebugger对于识别“从给定对象到有问题的对象的序列化路径”非常有帮助。堆栈跟踪中在“Serialization stack”之前的所有美元符号表明您方法的容器对象存在问题。
虽然在容器类上简单地添加Serializable最为简单,但我更喜欢利用Scala作为函数式语言的特点,将您的函数作为一等公民使用:
sc.textFile("/home/ubuntu/data.txt",4).map { text =>
  val punctPattern = "[^a-zA-Z0-9\\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase
}

或者,如果您真的想保持事情分开:
val removePunctuation: String => String = (text: String) => {
  val punctPattern = "[^a-zA-Z0-9\\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase
}
sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)

这些选项当然有效,因为Regex是可序列化的,您应该确认一下。
其次但非常重要的是,构建一个Regex很昂贵,因此出于性能考虑,将其从转换中分离出来--可能使用broadcast

1
请问您能详细说明一下您最后的评论吗? - thebluephantom
你好,想请教一个问题:作为复习的一部分,我在Databricks COMMUNITY EDITION上运行了这个问题。现在,那是一个非集群设置,但通常情况下,根据我所看到的,序列化错误会发生。但这次没有出现。它是2.4.4版本 - 怎么想?文件太小?驱动程序方法?为什么会显示其他错误? - thebluephantom

4

阅读堆栈跟踪,可以看到:

$outer,类型:class A$A21$A$A21

这是一个非常好的提示。您的lambda表达式是可序列化的,但您的类不可序列化。

当您创建lambda表达式时,该表达式会引用外部类。在您的情况下,外部类不可序列化,即未实现Serializable或其中一个字段不是Serializable的实例。


顺便说一句,这可能是一个重复的重复的重复... ;) 但是我没有时间去寻找最好的答案来标记问题为重复。如果你找到了一些好的解释,请通知我并将问题标记为重复。 - T. Gawęda
1
@TGaweda:感谢您的回答,但我作为学习阶段的人完全不理解。在发布之前我已经搜索了这个问题。然而,它们都没有详细解释这一切意味着什么以及如何解决。如果您能提出可能的解决方案,那么将来任何遇到这个错误的人都会感激您。 - SarahB
@sumitb 这就是我发表这个答案的原因 :) - T. Gawęda
我认为我没有清楚地发布我的问题。让我再次澄清,我正在“Scala工作表”中编写所有内容。外部对象是否可序列化并不成问题。如果我创建一个带有主方法的Scala对象/类,并编写完全相同的代码,则可以正常工作。我请求找出如何在工作表中测试此功能。我很乐意听取一些建议。 - SarahB
1
@T.Gawęda,您能否解释一下如何读取序列化堆栈,特别是如何从序列化堆栈的“$outer,type:class A $ A21 $ A $ A21”部分推断出某些内容?相关问题:https://dev59.com/ArLma4cB1Zd3GeqPWizF - Omkar Neogi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接