现在我正在尝试在这个集群上处理一个约5GB的bzip2 CSV文件,但是我收到了以下错误消息:
16/11/23 17:29:53 WARN TaskSetManager: 在第6.0阶段中,任务49.2丢失(TID xxx, xxx.xxx.xxx.compute.internal):ExecutorLostFailure(执行器16退出,由正在运行的任务之一引起)原因:容器由YARN杀死以超出内存限制。10.4 GB的10.4 GB物理内存已使用。请考虑提高spark.yarn.executor.memoryOverhead。
我感到困惑的是为什么在一个约75GB的集群上,我会得到一个约10.5GB的内存限制(每个3m.xlarge实例15GB)...
这是我的EMR配置:
[
{
"classification":"spark-env",
"properties":{
},
"configurations":[
{
"classification":"export",
"properties":{
"PYSPARK_PYTHON":"python34"
},
"configurations":[
]
}
]
},
{
"classification":"spark",
"properties":{
"maximizeResourceAllocation":"true"
},
"configurations":[
]
}
]
根据我所了解的,设置maximizeResourceAllocation
属性应该会告诉EMR配置Spark以充分利用集群上所有可用的资源。即,我应该有大约75GB的内存可用...那么为什么我会遇到大约10.5GB的内存限制错误呢?以下是我运行的代码:
def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
.orderBy("timestamp"))
diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
.over(window))
time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
.withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
.orderBy("timestamp")
.rowsBetween(-1, 0))
sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))
return sessions
def aggregate_sessions(sessions):
median = pyspark.sql.functions.udf(lambda x: statistics.median(x))
aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg(
pyspark.sql.functions.first("site_id").alias("site_id"),
pyspark.sql.functions.first("user_id").alias("user_id"),
pyspark.sql.functions.count("id").alias("hits"),
pyspark.sql.functions.min("timestamp").alias("start"),
pyspark.sql.functions.max("timestamp").alias("finish"),
median(pyspark.sql.functions.collect_list("foo")).alias("foo"),
)
return aggregated
spark_context = pyspark.SparkContext(appName="process-raw-data")
spark_session = pyspark.sql.SparkSession(spark_context)
raw_data = spark_session.read.csv(sys.argv[1],
header=True,
inferSchema=True)
# Windowing doesn't seem to play nicely with TimestampTypes.
#
# Should be able to do this within the ``spark.read.csv`` call, I'd
# think. Need to look into it.
convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)
raw_data = raw_data.withColumn("timestamp",
convert_to_unix(pyspark.sql.functions.col("timestamp")))
sessions = sessionize(raw_data, SESSION_TIMEOUT)
aggregated = aggregate_sessions(sessions)
aggregated.foreach(save_session)
基本上,不过是窗口化和groupBy集合数据。
它始于几个这样的错误,并且随着相同错误数量的增加而逐渐停止。
我尝试使用--conf spark.yarn.executor.memoryOverhead运行spark-submit,但那似乎也无法解决问题。
--conf spark.executor.memory=20g
)。 - mrsrinivasspark.executor.memory=20G
。一些内存必须为操作系统保留(大约1GB),您可能还需要稍微增加memoryOverhead到2GB左右。这将使您的executor-memory剩下12GB。 - Glennie Helles Sindholtspark-submit --deploy-mode cluster --conf spark.executor.memory=12g --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.memory.fraction=0.8 --conf spark.memory.storageFraction=0.35
。在步骤开始时立即出现以下错误:Exception in thread "main" java.lang.IllegalArgumentException: Required executor memory (12288+2048 MB) is above the max threshold (11520 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'
。 - lauri108