为什么Spark会出现java.lang.OutOfMemoryError: GC overhead limit exceeded错误?

60

我正在尝试实现一个在Spark中运行良好的Hadoop Map/Reduce作业。Spark应用程序定义如下:

val data = spark.textFile(file, 2).cache()
val result = data
  .map(//some pre-processing)
  .map(docWeightPar => (docWeightPar(0),docWeightPar(1))))
  .flatMap(line => MyFunctions.combine(line))
  .reduceByKey( _ + _)

我的函数 MyFunctions.combine 在哪里

def combine(tuples: Array[(String, String)]): IndexedSeq[(String,Double)] =
  for (i <- 0 to tuples.length - 2;
       j <- 1 to tuples.length - 1
  ) yield (toKey(tuples(i)._1,tuples(j)._1),tuples(i)._2.toDouble * tuples(j)._2.toDouble)
combine函数生成的键值对数量取决于输入列表的大小,当输入列表很大时,可能会抛出异常。在Hadoop Map Reduce环境中,我没有遇到问题,因为combine函数产生输出结果的时候,Hadoop会将映射键值对写入磁盘。但是Spark似乎会一直在内存中保存,直到遇到java.lang.OutOfMemoryError: GC overhead limit exceeded错误。我可能做错了一些基本的东西,但是我找不到解决方法。由于我对Scala和Spark都不太了解,所以我不确定问题是由其中一个还是两个共同导致的。目前我正在尝试在自己的笔记本电脑上运行这个程序,对于长度不太长的tuples数组输入,它可以正常工作。

“data” 已经是 RDD 了吗? - WestCoastProjects
我刚刚编辑了代码,以展示我如何加载数据。 - Augusto
好的,我已经假设它确实是一个RDD(就像你展示的那样)并回答了它。 - WestCoastProjects
希望在您的帖子结尾处看到最终解决方案,谢谢! - Traveler
最终的解决方案是我标记为被接受答案的那一个 :) - Augusto
5个回答

18

在启动spark-shellspark-submit时,请添加以下JVM参数:

-Dspark.executor.memory=6g

当创建SparkContext实例时,您可以考虑明确设置工作器的数量:

分布式集群

conf/slaves中设置从节点名称:

val sc = new SparkContext("master", "MyApp")

1
不幸的是,内存选项并没有起到帮助作用。我仍然会收到“内存不足”的异常提示。 - Augusto
2
在我添加了这个选项并将最小分区更改为100 spark.textFile(conceptsFile,100).cache()之后,似乎运行时间更长,但最终出现了java.lang.OutOfMemoryError:Java堆空间的错误。 - Augusto
如果您更改spark.executor.memory=12g,它是否会运行更长时间?您的系统有多少内存可分配给工作进程?您可能需要在conf/slaves文件中添加更多的工作进程。 - WestCoastProjects
8
嗨 @javadba,看起来实际上是我试图在内存中组装整个排列数组导致了问题。使combine函数返回一个迭代器似乎解决了这个问题。感谢您的时间! - Augusto
关于在Spark Shell 2.2中使用,我添加了-Dspark.executor.memory=6g,但是在“6g”处出现了“错误:无效的文字数字”。通过sshell -i script.scala运行脚本。 - Peter Krauss

17
在文档中(http://spark.apache.org/docs/latest/running-on-yarn.html),您可以了解如何配置执行器和内存限制。
例如:
--master yarn-cluster --num-executors 10 --executor-cores 3 --executor-memory 4g --driver-memory 5g  --conf spark.yarn.executor.memoryOverhead=409

内存开销应该是执行器内存的10%。

编辑:将4096更正为409(下面的评论是指此处)


5
4G的10%应该大约是410(M),对吗? - piggybox
1
是的,应该大约是10%,抱歉。所以:spark.yarn.executor.memoryOverhead=409 - Carlos AG
1
配置键'spark.yarn.executor.memoryOverhead'自Spark 2.3起已被弃用,并可能在将来被删除。请改用新键'spark.executor.memoryOverhead'。 - K.S.

15

调整内存可能是一个不错的选择,正如已经建议的那样,因为这是一种在规模上很难看的昂贵操作。但也许一些代码修改会有所帮助。

您可以在合并函数中采用不同的方法,通过使用combinations函数避免使用if语句。在组合操作之前,我还会将元组的第二个元素转换为双精度浮点数:

tuples.

    // Convert to doubles only once
    map{ x=>
        (x._1, x._2.toDouble)
    }.

    // Take all pairwise combinations. Though this function
    // will not give self-pairs, which it looks like you might need
    combinations(2).

    // Your operation
    map{ x=>
        (toKey(x{0}._1, x{1}._1), x{0}._2*x{1}._2)
    }

这将提供一个迭代器,您可以将其向下使用,或者如果需要,可以使用toList将其转换为列表(或其他内容)。


嗨@ohruunuruus,我认为滑动不提供我想要做的相同行为。toKey函数只是将两个字符串组合在一起。 - Augusto
1
好的,我得等到我能够访问spark-shell时才能看一下。无论如何,您可以探索执行合并操作的其他方法。 - mattsilver
嗨@ohruunuruus,那个方法很有效,我认为返回迭代器实际上是解决问题的关键,因为我的for循环试图在内存中组装一个巨大的数组,但由于内存不足而失败。我知道我的for循环看起来不太像Scala,但我昨天才开始学习,所以那是我能做的。感谢您的时间! - Augusto

3

在长时间的回归拟合过程中,我遇到了相同的问题。我缓存了训练集和测试集,这解决了我的问题。

train_df, test_df = df3.randomSplit([0.8, 0.2], seed=142)
pipeline_model = pipeline_object.fit(train_df)

pipeline_model出现了java.lang.OutOfMemoryError: GC overhead limit exceeded的错误。 但是当我使用了

train_df, test_df = df3.randomSplit([0.8, 0.2], seed=142)
train_df.cache()
test_df.cache()
pipeline_model = pipeline_object.fit(train_df)

它起作用了。


2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接