如何避免 Spark 执行器因内存限制被 YARN 容器杀死?

18

我有以下的代码,其中大部分时间都会触发 hiveContext.sql()。我的任务是在处理完所有hive表分区后,创建几个表并向其中插入值。

因此,我首先执行 show partitions 并使用其输出结果在for循环中调用一些方法,这些方法会创建表(如果不存在)并使用 hiveContext.sql 向其中插入数据。

现在,我们无法在executor中执行 hiveContext,因此我必须在 driver 程序的 for 循环中执行它,并按顺序一个接一个地运行。当我在 YARN 集群中提交此 Spark 作业时,几乎每次都会因为找不到 shuffle 而导致我的 executor 丢失。

现在这种情况是由于 YARN 在记忆超载时杀死了我的 executor。我不明白为什么,因为每个 hive 分区的数据集非常小,但仍然会导致 YARN 杀死我的 executor。

以下代码是否会并行执行所有操作,并同时尝试容纳所有 hive 分区的数据集到内存中?

public static void main(String[] args) throws IOException {   
    SparkConf conf = new SparkConf(); 
    SparkContext sc = new SparkContext(conf); 
    HiveContext hc = new HiveContext(sc); 

    DataFrame partitionFrame = hiveContext.sql(" show partitions dbdata partition(date="2015-08-05")"); 
  
    Row[] rowArr = partitionFrame.collect(); 
    for(Row row : rowArr) { 
        String[] splitArr = row.getString(0).split("/"); 
        String server = splitArr[0].split("=")[1]; 
        String date =  splitArr[1].split("=")[1]; 
        String csvPath = "hdfs:///user/db/ext/"+server+".csv"; 
        if(fs.exists(new Path(csvPath))) { 
            hiveContext.sql("ADD FILE " + csvPath); 
        } 
        createInsertIntoTableABC(hc,entity, date); 
        createInsertIntoTableDEF(hc,entity, date); 
        createInsertIntoTableGHI(hc,entity,date); 
        createInsertIntoTableJKL(hc,entity, date); 
        createInsertIntoTableMNO(hc,entity,date); 
   } 
}
2个回答

19

通常情况下(至少在Spark 1.3.1版本中),您应该始终挖掘日志以获取真正的异常信息。

简要概括
在Yarn下安全配置Spark
spark.shuffle.memoryFraction=0.5 - 这将允许shuffle使用更多的分配内存
spark.yarn.executor.memoryOverhead=1024 - 这是设置为MB。当executor-memory + executor.memoryOverhead大于内存使用量时,Yarn会杀死执行器。

更多信息

从您的问题描述中可以看出,您遇到了shuffle未找到的异常。

如果出现 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 您应该增加spark.shuffle.memoryFraction,例如到0.5。

Yarn杀死执行器最常见的原因是内存使用超出了预期。 为了避免这种情况,您可以增加spark.yarn.executor.memoryOverhead,我设置为1024,即使我的执行器只使用了2-3G的内存。


嗯,Barak,那么重新分区数据集,使每个分区持有更少的数据怎么样? - gsamaras
@gsamaras 数据存储在不同的内存区域中,在Spark 1.3.1中它不是动态的。因此,您实际上不会为洗牌在执行器上“释放”一些内存。您必须明确增加洗牌区域。 话虽如此,如果您减少每个分区的数据量,则可能在映射侧具有较小的洗牌内存需求,因此可能会有所帮助。 请记住,重新分区对过程产生其他影响,因此我不会将其用作解决此特定问题的解决方案。这可能是一个好主意,但这是一个更大的主题 :) - Barak1731475

0

这是我的假设:您的集群上可能有限制的执行器,并且作业可能在共享环境中运行。

正如您所说,因为文件大小较小,您可以设置更少的执行器,增加执行器核心,并设置memoryOverhead属性非常重要。

  1. 设置执行器数量= 5
  2. 设置执行器核心数= 4
  3. 设置内存开销= 2G
  4. shuffle分区= 20(基于执行器和核心使用最大并行性)

使用上述属性,我确信您将避免任何执行器内存不足问题,而不会影响性能。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接