Spark java.lang.OutOfMemoryError: Java heap space

290

我的集群:1个主节点,11个从节点,每个节点都有6GB的内存。

我的设置:

spark.executor.memory=4g, Dspark.akka.frameSize=512

问题如下:

首先,我从HDFS中读取了一些数据(2.19 GB)到RDD中:

val imageBundleRDD = sc.newAPIHadoopFile(...)

其次,对这个RDD执行一些操作:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

最后,输出到HDFS:

res.saveAsNewAPIHadoopFile(...)

当我运行我的程序时,它会显示:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

任务太多了吗?

PS:当输入数据约为225MB时,一切正常。

我该如何解决这个问题?


如何运行Spark?是从控制台吗?还是使用哪些部署脚本? - Tombart
我使用sbt编译和运行我的应用程序。sbt package然后sbt run。一个月前,我在hadoop上实现了相同的程序,并遇到了OutOfMemoryError的问题,但在hadoop中,可以通过将mapred.child.java.opts的值从Xmx200m增加到Xmx400m来轻松解决。Spark是否有任何JVM设置来处理其任务?我想知道spark.executor.memory是否与hadoop中的mapred.child.java.opts具有相同的含义。在我的程序中,spark.executor.memory已经设置为4g,比hadoop中的Xmx400m大得多。谢谢~ - Hellen
你提到的三个步骤是唯一需要执行的吗?(data._1, desPoints) 生成的数据大小是多少?如果将此数据随后洗牌到另一个阶段,它应该适合内存。 - Arnon Rotem-Gal-Oz
2
驱动程序的内存配置是什么?检查哪个服务器出现了内存不足错误。是驱动程序还是其中一个执行器。 - RanP
查看所有配置属性请点击此链接:https://spark.apache.org/docs/2.1.0/configuration.html - Naramsim
@hequn8128 如果您能在这里接受一个答案的话,将来参考起来会很方便。说实话,我个人比较喜欢samthebest的回答。 - Naman
14个回答

453

我有几个建议:

  • 如果您的节点配置为Spark最大6G(并且为其他进程保留一点),则使用spark.executor.memory=6g而不是4G。通过检查UI确保尽可能利用内存(它会显示您正在使用多少内存)。
  • 尝试使用更多分区,每个CPU应该有2-4个。增加分区数往往是使程序更稳定(通常更快)的最简单方法。对于大量数据,您可能需要比每个CPU 4个更多的分区,我在某些情况下必须使用8000个分区!
  • 减少缓存保留的内存比例,使用spark.storage.memoryFraction。如果您的代码中没有使用cache()persist,则此值可能为0。默认值为0.6,这意味着您的堆只能获得0.4 * 4G内存。降低内存分数经常使OOMs消失。 更新:从Spark 1.6开始,显然我们将不再需要使用这些值,Spark将自动确定它们。
  • 类似于上面的shuffle memory fraction。如果您的作业不需要太多的洗牌内存,则将其设置为较低的值(这可能会导致您的洗牌溢出到磁盘,这可能对速度产生灾难性的影响)。有时候当洗牌操作OOM时,您需要执行相反的操作,即将其设置为像0.8这样的大值,或者确保允许您的洗牌溢出到磁盘(自1.0.0以来默认情况下)。
  • 注意内存泄漏,这些内存泄漏通常是由于在lambda中意外地关闭不需要的对象造成的。诊断方法是查看日志中的“任务序列化为XXX字节”,如果XXX大于几k或大于MB,则可能会发生内存泄漏。请参见https://dev59.com/7GEh5IYBdhLWcg3wTBtc#25270600
  • 与上述相关;如果您确实需要大型对象,请使用broadcast variables
  • 如果您正在缓存大型RDD并且可以牺牲一些访问时间,请考虑序列化RDDhttp://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage。甚至可以将它们缓存在磁盘上(如果使用SSD的话有时并不那么糟糕)。
  • 高级)与上述相关,请避免String和嵌套结构(如Map和嵌套的case类)。如果可能,请尽量仅使用基本类型并索引所有非基元类型,特别是如果您预计会有很多重复项。在可能的情况下,选择WrappedArray而不是嵌套结构。甚至可以自己实现序列化-您将拥有最多关于如何将数据有效地备份到字节中的信息,请利用它
  • 有点骇人听闻)再次缓存时,考虑使用Dataset将您的结构缓存在其中,因为它将使用更有效的序列化。与前一个项目相比,这应被视为一种黑客技巧。通过将您的领域知识构建到您的算法/序列化中,可以将内存/缓存空

    http://spark.apache.org/docs/1.2.1/configuration.html

    编辑:(这样我可以更容易地通过谷歌搜索)以下内容也表明了这个问题:

    java.lang.OutOfMemoryError : GC overhead limit exceeded
    

3
@samthebest 这个回答太棒了。我非常感谢你提供的日志记录帮助来找到内存泄漏问题。 - Myles Baker
1
嗨@samthebest,您是如何指定8000个分区的?由于我正在使用Spark SQL,我只能使用spark.sql.shuffle.partitions来指定分区,默认值为200。我应该将其设置为更多吗?我尝试将其设置为1000但无法解决OOM问题。您知道最佳分区值应该是多少吗?我有1TB偏斜数据需要处理,并且涉及到基于hive的group by查询。请给予指导。 - Umesh K
2
嗨@user449355,请您发一个新的问题好吗?因为长篇评论可能会让事情变得混乱:) 如果你正在遇到问题,很可能其他人也有同样的问题,并且发布一个问题将使所有人更容易找到答案。 - samthebest
1
针对你的第一个观点,@samthebest,你不应该使用所有的内存用于 spark.executor.memory,因为你肯定需要一些内存用于 I/O 开销。如果你全部使用,它会减缓程序运行速度。唯一的例外是 Unix,因为它有交换空间。 - makansij
1
WrappedArrayArray更加规矩,例如相等性和toString的功能正常,但在内存使用方面与Array相似。我还模糊地记得Array和Spark在过去的某个时候不太兼容。说实话,我已经回答这个问题有10年了,所以无法准确回忆起当时的动机 :( @combinatorist - undefined
显示剩余11条评论

79

为了补充一个通常不被讨论的用例,我将提出一种解决方案,当通过 spark-submit本地模式下提交 Spark 应用程序时。

根据 Mastering Apache Spark 一书作者Jacek Laskowski 的说法:

您可以在本地模式下运行 Spark。在这种非分布式单 JVM 部署模式中,Spark 在同一个 JVM 中生成所有执行组件 - 驱动程序、执行器、后端和主节点。这是唯一使用驱动程序进行执行的模式。

因此,如果您遇到 heapOOM 错误,则只需调整 driver-memory 而不是 executor-memory

以下是一个示例:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 

在独立模式下,我们应该考虑多少百分比的驱动程序内存? - Yashwanth Kambala
@Brian,本地模式下,驱动程序内存是否需要大于输入数据大小?是否可以指定输入数据集的分区数,以便Spark作业可以处理比可用RAM大得多的数据集? - fuyi
1
驱动程序内存不能大于输入大小。考虑到您需要将一个160GB的文件加载到您的集群中,那么您会创建一个161GB的驱动程序吗?这是不可行的。您需要通过查看yarn UI和分配给您的集群内存来确定执行器数量、它们的内存以及用于超额内存和操作系统的缓冲区。为了获得更好的性能,您还需要考虑执行器核心,它们应该始终在3-5之间。 - whatsinthename

44
您应该按照下面所示的方式配置offHeap内存设置:
val spark = SparkSession
     .builder()
     .master("local[*]")
     .config("spark.executor.memory", "70g")
     .config("spark.driver.memory", "50g")
     .config("spark.memory.offHeap.enabled",true)
     .config("spark.memory.offHeap.size","16g")   
     .appName("sampleCodeForReference")
     .getOrCreate()

根据您计算机的RAM可用性,为驱动程序内存和执行器内存分配适当的内存大小。如果仍然遇到OutOfMemory问题,您可以增加offHeap大小。


添加了offHeap设置,有所帮助。 - kennyut
4
在代码中设置驱动程序内存无效,请阅读Spark文档以了解更多信息: Spark属性主要可以分为两种:一种与部署相关,例如“spark.driver.memory”,“spark.executor.instances”,这些属性可能不受通过SparkConf在运行时编程设置的影响,或者其行为取决于您选择的集群管理器和部署模式,因此建议通过配置文件或spark-submit命令行选项来设置。 - Abdulhafeth Sartawi
3
最佳答案!我的问题是Spark没有安装在主节点上,我只是使用PySpark连接到HDFS并得到了相同的错误。使用“config”解决了这个问题。 - Mikhail_Sam
1
我刚刚使用spark-submit命令添加了配置以解决堆大小问题。谢谢。 - Pritam Sadhukhan
3
请注意,对于pyspark用户,.config("spark.memory.offHeap.enabled",true)应更改为.config("spark.memory.offHeap.enabled","true") - scottlittle

18
你应该增加驱动器内存。在你的 $SPARK_HOME/conf 文件夹中,你应该找到文件 spark-defaults.conf,编辑并设置 spark.driver.memory 4000m,根据主节点上的内存情况而定,我想。 这就是我解决问题的方法,现在一切都运行得很顺畅。

在独立模式下,应分配多少百分比的内存? - Yashwanth Kambala

15

看一下启动脚本,Java堆大小在那里设置,看起来你在运行Spark worker之前没有设置它。

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"

您可以在这里找到部署脚本的文档。


谢谢,我稍后会尝试。从Spark UI上可以看到每个执行器的内存为4096。那么这个设置已经启用了,对吧? - Hellen
我在遇到类似问题的时候看到了你的答案(http://stackoverflow.com/questions/34762432/spark-ignores-spark-worker-memory)。根据你提供的链接,似乎不再需要设置Xms / Xmx,你能告诉我为什么吗? - Seffy
抱歉,与“启动脚本”相关的脚本链接中的内容已经更改。截至2019-12-19,不存在这样的选项。 - David Groomes

14

在使用动态资源分配时,我遭遇了这个问题很多次。我曾经以为它会最大程度地利用我的集群资源来适应应用程序

但事实是,动态资源分配没有设置驱动器内存,并将其保留为默认值,即1G。

我通过将 spark.driver.memory 设置为适合我的驱动器内存的数字(例如对于32GB RAM,我将其设置为18G)来解决这个问题。

您可以使用以下命令进行设置:

spark-submit --conf spark.driver.memory=18g

非常重要的提示:根据Spark文档-动态加载Spark属性,如果您从代码中设置此属性,则不会考虑该属性:

Spark属性主要可以分为两种:一种与部署相关,例如“spark.driver.memory”、“spark.executor.instances”,这种属性可能在运行时通过SparkConf以编程方式设置时不受影响,或者其行为取决于您选择的集群管理器和部署模式,因此建议通过配置文件或spark-submit命令行选项进行设置;另一种主要与Spark运行时控制相关,例如“spark.task.maxFailures”,这种属性可以通过任何一种方式设置。


2
你应该使用 --conf spark.driver.memory=18g。 - merenptah

7
总体而言,Spark Executor JVM内存可以分为两部分。Spark内存和用户内存。这由属性spark.memory.fraction控制,其值介于0和1之间。 在处理图像或进行内存密集型处理的Spark应用程序中,考虑减少spark.memory.fraction。这将为您的应用程序工作提供更多的内存。Spark可以溢出,因此它仍将使用较少的内存份额。 问题的第二部分是工作分配。如果可能,将数据分区成较小的块。较小的数据可能需要较少的内存。但如果不可能,则需牺牲计算以换取内存。通常,单个执行程序将运行多个核心。执行程序的总内存必须足以处理所有并发任务的内存要求。如果增加执行程序内存不是一个选项,则可以减少每个执行程序的核心数,以便每个任务获得更多的内存来处理。 测试具有最大可能可用内存的1个核心执行程序,然后逐渐增加核心数量,直到找到最佳核心数量。

5

您是否已经删除了主gc日志?我遇到了类似的问题,并发现SPARK_DRIVER_MEMORY仅设置Xmx堆。初始堆大小保持为1G,堆大小从未扩大到Xmx堆。

通过添加"--conf "spark.driver.extraJavaOptions=-Xms20g" 解决了我的问题。

运行命令 "ps aux | grep java" ,您会看到下面的日志:

24501 30.7 1.7 41782944 2318184 pts/0 Sl+ 18:49 0:33 /usr/java/latest/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* -Xmx30g -Xms20g


5

在Spark 1.0.0中,设置内存堆大小的位置位于conf/spark-env文件中。相关变量是SPARK_EXECUTOR_MEMORYSPARK_DRIVER_MEMORY。更多文档请参考部署指南

此外,不要忘记将配置文件复制到所有从节点。


5
你如何知道在 SPARK_EXECUTOR_MEMORYSPARK_DRIVER_MEMORY 中应该调整哪一个? - makansij
18
什么错误会提示你增加SPARK_EXECUTOR_MEMORY,什么错误会提示你增加SPARK_DRIVER_MEMORY - makansij

3
我有几个建议针对上述错误:
● 检查执行器内存分配,因为执行器可能需要处理需要比所分配的更多内存的分区。
● 尝试查看是否有更多的shuffle在运行,因为shuffle是昂贵的操作,因为它们涉及磁盘I/O、数据序列化和网络I/O。
● 使用广播连接。
● 避免使用 groupByKeys,并尝试用 ReduceByKey 替换。
● 在发生 shuffle 的地方避免使用巨大的 Java 对象。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接