Spark内存溢出

11

我有一个包含150GB的文本文件夹(大约700个文件,平均每个文件200MB左右)。

我使用scala来处理这些文件,并最终计算一些聚合统计数据。我看到两种可能的方法:

  • 手动循环遍历所有文件,对于每个文件进行计算并在最后合并结果
  • 将整个文件夹读入一个RDD,在单个RDD上执行所有操作,并让spark进行所有并行化

我倾向于采用第二种方法,因为它更加简洁(不需要特定于并行化的代码),但是我想知道我的情况是否符合硬件和数据所施加的限制。我有一台工作站可用,拥有16个线程和64 GB的RAM(因此并行化只能在不同的处理器核之间严格进行)。我以后可能会扩展基础设施,但目前我只想专注于调整这个工作站场景的设置。

我正在使用的代码: - 读取TSV文件,并提取有意义的数据到(String,String,String)三元组中 - 然后进行一些过滤、映射和分组 - 最后,缩小数据,并计算一些聚合数据

我已经能够运行单个文件(大约200MB的数据)的代码,但是当添加更多数据时,我会遇到java.lang.OutOfMemoryError: GC overhead limit exceeded和/或Java out of heap异常(该应用程序使用6GB的数据就会崩溃,但我希望将其用于150GB的数据)。

我猜我需要调整一些参数才能使其正常工作。我会感激任何关于如何解决这个问题(如何调试内存需求)的提示。我已经尝试增加“spark.executor.memory”并使用较少的核心数(理性的推断是每个核心需要一些堆空间),但这没有解决我的问题。

我不需要解决方案非常快(即使需要几个小时甚至几天也可以)。我也没有对数据进行缓存,只是最终保存到文件系统中。如果您认为手动并行化方法更可行,我也可以这样做。


1
如果您不打算使用分布式集群,那么您如何将150G适配到64RAM中呢? - eliasah
2
我在考虑采用一种方式,将数据分块处理,如果需要的话,将部分结果存储到磁盘上,继续处理下一个数据块,直到所有数据块都处理完毕,最后合并部分结果。 - Igor
这要看你的数据是否连续。另一个可能性是使用分布式搜索引擎,如Solr或Elasticsearch对数据进行索引,然后您可以运行统计功能。一切都取决于数据模式以及实际使用方式。 - eliasah
2
通过大幅增加分区的数量,可以达到你想要的效果,即逐位处理。这个答案列出了你可以尝试的所有方法:https://dev59.com/DmEi5IYBdhLWcg3wltIl#22742982 - samthebest
我会先理解您想要做什么以及如何以非常简单的术语来表达。如果性能不是问题,为什么需要将它们加载到内存中?您执行的算法是否具有迭代性质?如果没有,也许Hadoop MR会是更好的选择。根据我的经验,我发现在内存方面过度使用Spark会带来一些麻烦。例如,Spark会逐个部分地驱逐缓存的RDD并运行GC。这会影响性能。(旧版本可能已修复)。我还发现,在使用Spark之前,使用Hadoop MR预处理数据非常有用。希望这可以帮助到您。 - Ioannis Deligiannis
显示剩余4条评论
3个回答

4
我和我的团队成功地在5台每台32GB RAM的机器上处理了一个超过1TB大小的csv数据。这主要取决于你正在进行的处理类型以及如何处理。
以下是需要注意的几点:
  1. 如果重新分区RDD,则需要额外的计算,这会产生超出堆大小的开销。尝试通过减少TextInputFormat.SPLIT_MINSIZETextInputFormat.SPLIT_MAXSIZE(如果使用)中的拆分大小来提高并行性,从而使用更多的并行性加载文件。

  2. 尝试使用mapPartition而不是map,以便您可以在分区内处理计算。如果计算使用临时变量或实例,并且仍然面临内存不足问题,请尝试降低每个分区的数据量(增加分区数)。

  3. 在创建Spark Context之前,在Spark配置中使用“spark.executor.memory”和“spark.driver.memory”来增加驱动程序内存和执行器内存限制。

请注意,Spark是一种通用的集群计算系统,因此在单台计算机上使用Spark效率较低(我认为)。

你有使用有限内存读取大文件的示例代码吗?尤其是如何使用TextInputFormat.SPLIT_MAXSIZEmapPartitions?我正在使用conf.set("TextInputFormat.SPLIT_MAXSIZE", "512M"),但没有成功。 - Kane

0
为了从代码角度添加另一个视角(而不是配置),有时最好找出您的Spark应用程序在哪个阶段超出了内存,并查看是否可以进行更改以解决问题。当我学习Spark时,我有一个Python Spark应用程序,它因OOM错误而崩溃。原因是我将所有结果都收集到主节点,而不是让任务保存输出。
例如:
for item in processed_data.collect():
   print(item)
  • 由于OOM错误而失败。另一方面,

processed_data.saveAsTextFile(output_dir)

  • 运行良好。

0

是的,PySpark RDD/DataFrame collect() 函数用于检索数据集(从所有节点)的所有元素到驱动程序节点。通常在 filter()group()count() 等操作之后,我们应该在较小的数据集上使用 collect()。检索较大的数据集会导致内存不足。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接