在具有75GB内存的EMR集群上,“容器因超过内存限制而被YARN终止。使用了10.4 GB的10.4 GB物理内存。”

67
我在AWS EMR上运行了一个5节点的Spark集群,每个节点大小为m3.xlarge(1个主节点和4个从节点)。我成功地处理了一个146MB的bzip2压缩CSV文件,并得到了完美的聚合结果。
现在我正在尝试在这个集群上处理一个约5GB的bzip2 CSV文件,但是我收到了以下错误消息:
16/11/23 17:29:53 WARN TaskSetManager: 在第6.0阶段中,任务49.2丢失(TID xxx, xxx.xxx.xxx.compute.internal):ExecutorLostFailure(执行器16退出,由正在运行的任务之一引起)原因:容器由YARN杀死以超出内存限制。10.4 GB的10.4 GB物理内存已使用。请考虑提高spark.yarn.executor.memoryOverhead。
我感到困惑的是为什么在一个约75GB的集群上,我会得到一个约10.5GB的内存限制(每个3m.xlarge实例15GB)...
这是我的EMR配置:
[
 {
  "classification":"spark-env",
  "properties":{

  },
  "configurations":[
     {
        "classification":"export",
        "properties":{
           "PYSPARK_PYTHON":"python34"
        },
        "configurations":[

        ]
     }
  ]
},
{
  "classification":"spark",
  "properties":{
     "maximizeResourceAllocation":"true"
  },
  "configurations":[

  ]
 }
]

根据我所了解的,设置maximizeResourceAllocation属性应该会告诉EMR配置Spark以充分利用集群上所有可用的资源。即,我应该有大约75GB的内存可用...那么为什么我会遇到大约10.5GB的内存限制错误呢?以下是我运行的代码:

def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp"))
    diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
            .over(window))
    time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
                 .withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp")
              .rowsBetween(-1, 0))
    sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))
    return sessions
def aggregate_sessions(sessions):
    median = pyspark.sql.functions.udf(lambda x: statistics.median(x))
    aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg(
        pyspark.sql.functions.first("site_id").alias("site_id"),
        pyspark.sql.functions.first("user_id").alias("user_id"),
        pyspark.sql.functions.count("id").alias("hits"),
        pyspark.sql.functions.min("timestamp").alias("start"),
        pyspark.sql.functions.max("timestamp").alias("finish"),
        median(pyspark.sql.functions.collect_list("foo")).alias("foo"),
    )
    return aggregated
 spark_context = pyspark.SparkContext(appName="process-raw-data")
spark_session = pyspark.sql.SparkSession(spark_context)
raw_data = spark_session.read.csv(sys.argv[1],
                                  header=True,
                                  inferSchema=True)
# Windowing doesn't seem to play nicely with TimestampTypes.
#
# Should be able to do this within the ``spark.read.csv`` call, I'd
# think. Need to look into it.
convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)
raw_data = raw_data.withColumn("timestamp",
                               convert_to_unix(pyspark.sql.functions.col("timestamp")))
sessions = sessionize(raw_data, SESSION_TIMEOUT)
aggregated = aggregate_sessions(sessions)
aggregated.foreach(save_session)

基本上,不过是窗口化和groupBy集合数据。

它始于几个这样的错误,并且随着相同错误数量的增加而逐渐停止。

我尝试使用--conf spark.yarn.executor.memoryOverhead运行spark-submit,但那似乎也无法解决问题。


你可以贴出完整的错误日志吗?根据你的描述,我们很难理解问题所在。 - eliasah
你可以在 Spark Submit 中传递属性(例如:--conf spark.executor.memory=20g)。 - mrsrinivas
m3.xlarge有15GB的内存,因此您不能设置spark.executor.memory=20G。一些内存必须为操作系统保留(大约1GB),您可能还需要稍微增加memoryOverhead到2GB左右。这将使您的executor-memory剩下12GB。 - Glennie Helles Sindholt
@mrsrinivas,我认为groupBy只是RDD的一个问题。DataFrame API是否也存在同样的问题?您是否建议尝试使用窗口方法来聚合数据,而不是使用groupBy? - lauri108
@mrsrinivas 和 @glennie-helles-sindholt,我已经使用1+4台m3.xlarge机器重新运行了代码:spark-submit --deploy-mode cluster --conf spark.executor.memory=12g --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.memory.fraction=0.8 --conf spark.memory.storageFraction=0.35。在步骤开始时立即出现以下错误:Exception in thread "main" java.lang.IllegalArgumentException: Required executor memory (12288+2048 MB) is above the max threshold (11520 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb' - lauri108
显示剩余6条评论
5个回答

73

我能理解你的困境。

我们使用Spark on YARN时也遇到了类似的内存不足问题。我们有五个64GB、16核心的虚拟机,无论我们将 spark.yarn.executor.memoryOverhead 设置为多少,都无法为这些任务提供足够的内存--它们最终会死亡,无论我们分配了多少内存。而这是一个相对简单的Spark应用程序导致的问题。

我们发现,虚拟机的物理内存使用率非常低,但虚拟内存使用率却极高(尽管日志抱怨的是"物理"内存)。我们在yarn-site.xml 中设置了 yarn.nodemanager.vmem-check-enabledfalse,之后我们的容器就不再被杀死,应用程序看起来正常运行。

经过更多研究,我在这里找到了为什么会发生这种情况的答案:http://web.archive.org/web/20190806000138/https://mapr.com/blog/best-practices-yarn-resource-management/

由于Centos/RHEL 6存在对虚拟内存的过度分配,因此您应该禁用虚拟内存检查器或将yarn.nodemanager.vmem-pmem-ratio增加到相对较大的值。

该页面链接到了IBM的一个非常有用的页面:https://web.archive.org/web/20170703001345/https://www.ibm.com/developerworks/community/blogs/kevgrig/entry/linux_glibc_2_10_rhel_6_malloc_may_show_excessive_virtual_memory_usage?lang=en

总之, glibc > 2.10 改变了其内存分配方式。虽然分配大量的虚拟内存并不是世界末日,但它与 YARN 的默认设置不兼容。 您可以尝试在 hadoop-env.sh 中使用 MALLOC_ARENA_MAX 环境变量将其设置为较低的数字,而不是将 yarn.nodemanager.vmem-check-enabled 设置为 false。这个错误报告提供了有关此信息的帮助: https://issues.apache.org/jira/browse/HADOOP-7154 我建议阅读这两个页面 - 信息非常有用。

2
属性为 yarn.nodemanager.vmem-check-enabled,注意连字符。 - Joffer
我在yarn-site.xml中没有找到这个属性。我正在使用Amazon EMR上的Spark。 - lfvv
1
@lfvv,您可能需要手动添加它。您可以在此处找到各种其他设置:https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-common/yarn-default.xml - Duff
3
我认为告诉资源管理器不再正确管理其资源并不是一个好的解决方案。 - Clemens Valiente
@ClemensValiente 我觉得你是对的... 如果是这样的话,也许调整MALLOC_ARENA_MAX是更好的选择。不过我自己还没有尝试过。 - Duff
显示剩余3条评论

20

如果您没有使用 spark-submit 命令,且正在寻找另一种指定 yarn.nodemanager.vmem-check-enabled 参数的方法(这个参数是由 Duff 提到的),以下有两种其他方法:

方法 2

如果您正在使用 JSON 配置文件(该文件会传递给 AWS CLI 或 boto3 脚本),则需要添加以下配置:

[{
"Classification": "yarn-site", 
  "Properties": {
    "yarn.nodemanager.vmem-check-enabled": "false"
   }
}]

方法三

如果您使用 EMR 控制台,请添加以下配置:

classification=yarn-site,properties=[yarn.nodemanager.vmem-check-enabled=false]

13

看,

我在一个巨大的集群中遇到了同样的问题。将内存添加到worker上并不能解决问题。有时在聚合过程中,Spark将使用超出其拥有的内存,并且Spark作业将开始使用堆外内存。

一个简单的例子是:

如果您有一个数据集需要进行reduceByKey,则有时会在一个worker上聚合比其他worker更多的数据。如果这些数据超过了一个worker的内存,则会出现该错误消息。

添加选项spark.yarn.executor.memoryOverhead将有所帮助,如果您为worker使用的内存设置50%(仅供测试,看看它是否有效,您可以通过更多测试添加较少的内存)。

但是,您需要了解Spark如何在集群中分配内存:

  1. Spark最常用的方式是使用机器内存的75%,其余部分用于操作系统。
  2. Spark在执行过程中有两种类型的内存。一部分用于执行,另一部分用于存储。执行用于Shuffles、Joins、Aggregations等等。存储用于缓存和在集群中传播数据。

关于内存分配的一个好处是,如果您的执行中没有使用缓存,则可以设置Spark使用该存储空间来处理执行,以避免部分OOM错误。正如您可以在Spark文档中看到的那样:

这种设计确保了几个理想的属性。首先,不使用缓存的应用程序可以使用整个空间进行执行,从而避免不必要的磁盘溢出。其次,使用缓存的应用程序可以保留最小存储空间(R),其中它们的数据块不会被驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而不需要用户了解内部内存如何划分的专业知识。

但我们怎么使用呢?

您可以更改一些配置。在作业调用中添加MemoryOverhead配置项,但要考虑添加spark.memory.fraction并将其更改为0.8或0.85,并将spark.memory.storageFraction减少到0.35或0.2。
其他配置也可能有帮助,但需要根据您的情况进行检查。您可以在此处查看所有这些配置:链接
对于我的情况,我有一个有2.5K工作节点和2.5TB内存的集群。我们遇到了像您一样的OOM错误。我们只需将spark.yarn.executor.memoryOverhead增加到2048,并启用动态分配。当我们调用作业时,我们不设置工作节点的内存,而是让Spark来决定。我们只设置Overhead。
但是对于我的小型集群的一些测试,更改执行和存储内存的大小就解决了问题。

2
我已经使用1+4个m3.xlarge机器集群重新运行了这些参数:spark-submit --deploy-mode cluster --conf spark.executor.memory=12g --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.memory.fraction=0.8 --conf spark.memory.storageFraction=0.35,并在步骤开始时立即出现以下错误:Exception in thread "main" java.lang.IllegalArgumentException: Required executor memory (12288+2048 MB) is above the max threshold (11520 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb' - lauri108
1
这条信息告诉你需要做什么:你的 spark.executor.memory + spark.yarn.executor.memoryOverhead 必须小于 yarn.nodemanager.resource.memory-mb。我建议你减少 memoryOverhead,对于一个 15g 的节点,可以设置为 1g(1024 mb),并将你的 yarn.nodemanager.resource.memory-mb 增加到 12288 mb,将你的 spark.executor.memory 减少到 11264 mb。如果这样还不行,那么将 yarn.nodemanager.resource.memory-mb 增加到 13312 mb,并告诉我你的 yarn.scheduler.maximum-allocation-mb 是多少。 - makansij
这是一个比被接受的答案更好、更少侵入性的选项。如果您在YARN实例上已经运行了其他应用程序,更改yarn-site.xml可能会带来很大的风险和广泛的后果。 - josiah
1
我不能评论旧版本,但Spark 2.3.1在堆内存不足时不使用离堆内存。它将内存分成两个池:执行池和存储池。当其中一个池溢出时,它会从另一个池中获取资源。当两个池都满了时,任务会被阻止直到有空闲内存可用。 - Avseiytsev Dmitriy
这个错误信息似乎很误导人。因为问题似乎不是memoryOverhead太低,而是执行器整体内存不足。所以更好的解决方案/错误信息可能是增加执行器内存? - Ted
增加“spark.memory.fraction”并减少“spark.memory.storageFraction”安全吗?Spark配置不建议更改这些默认值。 - bhavi

6
尝试重新分区。在我的情况下,它有效。
当使用write.csv()加载数据框时,最初的数据框并不是很大。数据文件大小约为10MB左右,每个执行器处理任务可能需要几百MB的内存。
我当时检查了分区数,发现是2个。然后在后续操作中(如与其他表连接、添加新列),数据框变得越来越大。到某个阶段,我遇到了内存超限问题。我再次检查分区数,仍然是2个,这可能是由最初的数据框派生出来的。
所以我尝试在一开始对其进行重新分区,之后就没有问题了。
我还没有读过关于Spark和YARN的很多资料。我所知道的是,在节点中有执行器。一个执行器可以处理许多任务,这取决于资源。我的猜测是一个分区将被原子地映射到一个任务上。它的大小决定了资源使用情况。如果一个分区增长得太大,Spark就无法进行切片。
合理的策略是首先确定节点和容器内存,可以是10GB或5GB。理想情况下,两者都可以用于任何数据处理作业,只是时间问题。给定5GB内存设置,您可以通过测试找到一个适合的分区行,比如1000行(在处理过程中不会失败),可以按照以下伪代码进行操作:
RWS_PER_PARTITION = 1000
input_df = spark.write.csv("file_uri", *other_args)
total_rows = input_df.count()
original_num_partitions = input_df.getNumPartitions()
numPartitions = max(total_rows/RWS_PER_PARTITION, original_num_partitions)
input_df = input_df.repartition(numPartitions)

希望这能有所帮助!

1
我在运行相对较小的Spark 2.3.1作业的小集群上遇到了同样的问题。该作业读取parquet文件,使用groupBy / agg / first删除重复项,然后排序并写入新的parquet文件。它在4个节点(4个vcores,32GB RAM)上处理了51GB的parquet文件。
该作业在聚合阶段不断失败。我编写了一个bash脚本来监视执行器内存使用情况,并发现在阶段中间,一个随机执行器会在几秒钟内开始占用双倍内存。当我将这一时刻的时间与GC日志相关联时,它与清空大量内存的full GC匹配。
最后,我明白了问题与GC有关。ParallelGC和G1不断引起此问题,但ConcMarkSweepGC改善了情况。该问题仅出现在少量分区中。我在安装了OpenJDK 64-Bit(build 25.171-b10)的EMR上运行了该作业。我不知道问题的根本原因,可能与JVM或操作系统有关。但在我的情况下,它绝对与堆或非堆使用无关。 更新1 尝试了Oracle HotSpot,问题得以重现。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接