Apache Spark的缓存(cache() / persist())内存消耗

3
当我尝试使用cache()或persist(MEMORY_ONLY_SER())方法缓存我的RDD时,我的Spark集群会挂起。如果我不使用cache()方法,它可以在大约7分钟内计算出结果。
我有6个c3.xlarge EC2实例(每个实例4个核心,7.5 GB RAM),总共提供24个核心和37.7 GB内存。
我在主节点上使用以下命令运行应用程序:
SPARK_MEM=5g MEMORY_FRACTION="0.6" SPARK_HOME="/root/spark" java -cp ./uber-offline.jar:/root/spark/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar pl.instream.dsp.offline.OfflineAnalysis
数据集大小约为50GB,分成24个文件。我将其压缩并存储在S3存储桶中的24个文件中(每个文件大小为7MB到300MB)。
我绝对找不到我集群出现这种行为的原因,但似乎Spark消耗了所有可用内存并进入GC收集循环。当我查看GC详细信息时,我可以发现以下循环:
[GC 5208198K(5208832K), 0,2403780 secs]
[Full GC 5208831K->5208212K(5208832K), 9,8765730 secs]
[Full GC 5208829K->5208238K(5208832K), 9,7567820 secs]
[Full GC 5208829K->5208295K(5208832K), 9,7629460 secs]
[GC 5208301K(5208832K), 0,2403480 secs]
[Full GC 5208831K->5208344K(5208832K), 9,7497710 secs]
[Full GC 5208829K->5208366K(5208832K), 9,7542880 secs]
[Full GC 5208831K->5208415K(5208832K), 9,7574860 secs]

这最终导致消息变成这样:“
WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(0, ip-xx-xx-xxx-xxx.eu-west-1.compute.internal, 60048, 0) with no recent heart beats: 64828ms exceeds 45000ms

...并且阻止计算机的任何进展。这看起来像是内存被100%消耗了,但我尝试使用更多RAM的机器(例如每台30GB),效果是相同的。

这种行为的原因可能是什么??有人可以帮忙吗?


你的应用程序在持续进行垃圾回收,但内存恢复非常少,说明某些配置或编码错误导致其填充内存。 - jmj
是的 - 这也是我从日志中看到的。问题真正的原因是为什么发生了。 - Bartek
@Bartek,没有实际的代码很难说...但是,50 GB的文件量非常大。如果你一次性加载所有文件并尝试处理,当然会有严重的内存压力。 - Eugene
@Eugene 谢谢您的回复。据我所知,Apache-Spark应该能够处理这么大量的数据,并将部分数据缓存到磁盘上,或在内存出现问题时重新读取它。我没有放置代码,因为我不认为我的代码有错误,1)非常简单;2)如果我不缓存/持久化,它可以正常工作。如果您觉得有用,我可以粘贴它? - Bartek
尝试将一些数据放入缓存中,比如说100万个样本,观察Spark Web UI中内存的消耗情况,然后你就可以找到所需的总内存。 - Thomas Decaux
2个回答

5
尝试使用更多的分区,每个 CPU 应该有 2-4 个。我认为增加分区数量通常是使程序更稳定(并且通常更快)的最简单方法。
默认情况下,我认为您的代码将使用 24 个分区,但对于 50GB 的数据来说,这太少了。我建议至少尝试几百个分区。
接下来,您使用的是SPARK_MEM=5g,但是每个节点都有7.5GB的内存,因此您可以使用SPARK_MEM=7500m
您还可以尝试增加内存分数,但我认为上述方法更有帮助。
一般要点:使用 HDFS 存储文件而不是 S3,速度会快得多。在缓存数据之前确保正确处理数据 - 例如,如果您具有包含100列的 TSV 数据,但您只使用其中的10个字段,请确保在尝试缓存之前提取这些字段。

谢谢,coalesce听起来是一个不错的尝试方向。我会检查它并告诉你结果。coalesce(500,true)可能是一个不错的选择吗? - Bartek
1
在cache()之前添加coalesce(500, true)有所帮助。看起来GC问题已经解决了。 - Bartek
你对默认24个分区的假设是基于什么?S3中的文件数量?总核心数? - Eric Eijkelenboom
@EricEijkelenboom 是的,指的是文件数量。 - samthebest
我在第一次读取数据集时增加了重新分区的输出大小。这解决了我的问题。 - aysebilgegunduz

4

'原始'缓存和'序列化'缓存之间存在很大的区别

  1. 原始缓存:(rdd.cache()rdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY))

    这将消耗2-3倍的内存。例如,100MB的rdd可以在内存中消耗350MB。

  2. 序列化缓存(rdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER))

    这将几乎消耗相同数量的内存加上一些小的开销。例如,100MB的数据将在内存中消耗100MB + 几KB。

然而,在操作期间,原始缓存更快。序列化缓存需要更长时间(因为对象必须在计算之前进行反序列化)。

这是我实验的一个有趣结果。

enter image description here enter image description here


如果您发现确实有重复,请将它们标记为重复。 - bummi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接