在Scala中,Map不能被序列化吗?

33

我是Scala的新手。为什么“map”函数不可序列化?如何使它可序列化?例如,如果我的代码如下:

val data = sc.parallelize(List(1,4,3,5,2,3,5))

def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
  val lst = List(("a", 1),("b", 2),("c",3), ("a",2))
  var res = List[Int]()
  while (iter.hasNext) {
    val cur = iter.next
    val a = lst.groupBy(x => x._1).mapValues(_.size)
    //val b= a.map(x => x._2)
    res = res ::: List(cur)
  }
  res.iterator
}

data.mapPartitions(myfunc).collect

如果我取消注释这一行

val b= a.map(x => x._2)

该代码返回异常:

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: scala.collection.immutable.MapLike$$anon$2
Serialization stack:
    - object not serializable (class: scala.collection.immutable.MapLike$$anon$2, value: Map(1 -> 3))
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: a, type: interface scala.collection.immutable.Map)

非常感谢。


你好 zero323,我在 Spark 1.5 自带的 Scala shell 中直接运行了这段代码。我还在 Spark 1.0.1 的 Scala shell 中运行了该代码,但是出现了相同的问题。 - Carter
1
我怀疑这不是导致错误的实际代码?你的lst在实际代码中真的只是一个普通的列表吗?还是另一个RDD? - The Archetypal Paul
可能是与 NotSerializableException for Map[String, String] alias 的问题相似。 - zero323
@zero323 是的,我可以在Spark 1.5的Scala控制台中重现相同的问题(我下载了编译好的Spark 1.5,解压缩后使用“./bin/spark-shell”命令启动Scala shell,并将这段代码复制粘贴到shell中)。 - Carter
你是否遇到了相同的问题和代码?如果没有,能否告诉我你是如何运行它的?非常感谢。 - Carter
显示剩余10条评论
3个回答

72

众所周知的 Scala bug:https://issues.scala-lang.org/browse/SI-7005,Map#mapValues 不可序列化。

我们在 Spark 应用程序中也遇到了这个问题,map(identity) 解决了这个问题。

rdd.groupBy(_.segment).mapValues(v => ...).map(identity)

1
这也是我的假设,但根据 OP 的说法,它并没有解决问题。 - zero323
2
这对我有用。谢谢!我永远不会想到这个。为什么这样可以运行? - Matthew Jones
@Mattew 因为 mapValues 是惰性的(https://dev59.com/HJrga4cB1Zd3GeqPrLkD) - Federico
我知道这可能听起来很奇怪,但是在 .mapValues(v => ...) 的 (...) 中应该放什么?我的 Map[String, List[Set[String]]]。 - Danny Liu
1
当前错误任务/描述的链接:https://github.com/scala/bug/issues/7005 - juanmirocks
@DannyLiu 那里放什么并不重要。重点是任何对 mapValues 的调用都会创建一个不可序列化的视图。解决方法是添加 .map(identity),这样就强制将视图变为具体的。 - Synesso

5
下面提供了 mapValues 函数的实际实现,可以看到它不可序列化并且只创建一个视图,而不是数据的真正存在,因此您会收到此错误。就情况而言,mapValues 可以有许多优点。
protected class MappedValues[C](f: B => C) extends AbstractMap[A, C] with DefaultMap[A, C] {
    override def foreach[D](g: ((A, C)) => D): Unit = for ((k, v) <- self) g((k, f(v)))
    def iterator = for ((k, v) <- self.iterator) yield (k, f(v))
    override def size = self.size
    override def contains(key: A) = self.contains(key)
    def get(key: A) = self.get(key).map(f)
}

1

你尝试在应用程序中运行过这段代码吗?我怀疑这是 Spark Shell 的问题。如果你想让它在 Spark Shell 中工作,那么你可以尝试用花括号包装 myfunc 的定义和应用程序,像这样:

val data = sc.parallelize(List(1,4,3,5,2,3,5))

val result = { 
  def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
    val lst = List(("a", 1),("b", 2),("c",3), ("a",2))
    var res = List[Int]()
    while (iter.hasNext) {
      val cur = iter.next
      val a = lst.groupBy(x => x._1).mapValues(_.size)
      val b= a.map(x => x._2)
      res = res ::: List(cur)
    }
    res.iterator
  }
  data.mapPartitions(myfunc).collect
}

我只在Spark Shell中尝试了这段代码。看起来问题出在Shell上。 - Carter
当我们将其包装在val中时,发生了什么?为什么这样做可以修复它? - Geoff Langenderfer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接