在使用Kafka直接流时,Yarn上的Spark堆外内存泄漏问题

17

我正在使用Java 1.8.0_45和Scala 2.11支持的Spark Streaming 1.4.0在Yarn(Apache distribution 2.6.0)上运行,并使用Kafka直接流。我遇到的问题是,驱动程序和执行程序容器的物理内存使用量逐渐增加,直到yarn容器停止它们。我在我的驱动程序中配置了192M堆空间和384离堆空间,但最终还是使用完了。

堆内存看起来正常,有定期的GC循环。任何这样的运行时都没有遇到OutOfMemory错误。

事实上,我没有在Kafka队列上生成任何流量,但仍然会发生这种情况。以下是我使用的代码:

object SimpleSparkStreaming extends App {

val conf = new SparkConf()
val ssc = new StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L)));
ssc.checkpoint("checkpoint")
val topics = Set(conf.get("spark.kafka.topic.name")); 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> conf.get("spark.kafka.broker.list"))
            val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)
            kafkaStream.foreachRDD(rdd => {
                rdd.foreach(x => {
                    println(x._2)
                })

            })
    kafkaStream.print()
            ssc.start() 

            ssc.awaitTermination()

}

我正在CentOS 7上运行此命令。用于提交spark的命令如下:

./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
--conf spark.yarn.executor.memoryOverhead=256 \
--conf spark.yarn.driver.memoryOverhead=384 \
--conf spark.kafka.topic.name=test \
--conf spark.kafka.broker.list=172.31.45.218:9092 \
--conf spark.batch.window.size=1 \
--conf spark.app.name="Simple Spark Kafka application" \
--master yarn-cluster \
--num-executors 1 \
--driver-memory 192m \
--executor-memory 128m \
--executor-cores 1 \
/home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar 
任何帮助都将不胜感激。
致意,
Apoorva

1
我遇到了同样的问题,你找到解决方案了吗? - crak
我遇到了类似的问题,但还没有达到饱和点:https://dev59.com/zJTfa4cB1Zd3GeqPSpFN - Mohitt
请告诉我如果您找到了解决方案。 - Mohitt
我发现自己处于相同的情况中,不知道你是否找到了原因或解决方法? - Felix
3个回答

1
尝试增加执行器核心。在您的示例中,唯一的核心专门用于消耗流数据,没有核心处理传入的数据。

这是DirectStream,一个执行器核心就可以了。http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers - Artur Bartnik

0
可能是内存泄漏...你试过使用conf.set("spark.executor.extraJavaOptions","-XX:+UseG1GC")吗?

0

这不是关于Kafka的答案,而是关于Spark及其目录系统在一致持久性和大型操作方面表现差的问题。如果您一直写入到持久层(即在大型操作后重复将DF重新持久化再次运行)或运行大型查询(即inputDF.distinct.count),则Spark作业将开始将一些数据放入内存,并且效率低下地删除旧的对象。

这意味着,随着时间的推移,能够快速运行一次的对象将逐渐变慢,直到没有足够的可用内存。对于每个人在家中,可以启动一个具有大型DataFrame的AWS EMR,并在环境中运行以下查询:

var iterator = 1
val endState = 15
var currentCount = 0
while (iterator <= endState) {
  currentCount = inputDF.distinct.count
  print("The number of unique records are : " + currentCount)
  iterator = iterator + 1
}

在作业运行时,请观察Spark UI的内存管理情况,如果DF足够大以满足会话需求,您将开始注意到每次运行时间的下降,主要是块变得陈旧,但Spark无法确定何时清除这些块。
我发现解决此问题的最佳方法是通过在本地编写我的DF,清除持久性层并重新加载数据来解决。这是一个“破玉劈金”的方法,但对于我的业务案例来说,这是一种易于实施的解决方案,使我们的大型表格的运行时间增加了90%(从需要540分钟减少到40分钟,使用更少的内存)。
我目前使用的代码是:
val interimDF = inputDF.action
val tempDF = interimDF.write.format(...).option("...","...").save("...")
spark.catalog.clearCache
val interimDF = spark.read..format(...).option("...","...").save("...").persist
interimDF.count

如果您在子进程中不取消持久化DF,则会出现导数问题:

val interimDF = inputDF.action
val tempDF = interimDF.write.format(...).option("...","...").save("...")
for ((k,v) <- sc.getPersistentRDDs) {
  v.unpersist()
}
val interimDF = spark.read..format(...).option("...","...").save("...").persist
interimDF.count

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接