在处理大型数据集时运行Spark时出现“sparkContext已关闭”的错误。

23

在集群上运行sparkJob时,当数据量超过一定大小(约2.5GB)时,我会收到“作业被取消,因为SparkContext已关闭”或“执行器丢失”的错误。在查看yarn gui时,我发现被终止的作业实际上是成功完成了的。如果处理的数据小于500MB,则没有任何问题。我正在寻找解决方案,并发现以下内容: -“似乎yarn会杀掉一些执行器,因为它们请求的内存超出了预期。”

有什么建议如何调试这个问题吗?

我提交Spark作业的命令:

/opt/spark-1.5.0-bin-hadoop2.4/bin/spark-submit  --driver-memory 22g --driver-cores 4 --num-executors 15 --executor-memory 6g --executor-cores 6  --class sparkTesting.Runner   --master yarn-client myJar.jar jarArguments

以及sparkContext设置

val sparkConf = (new SparkConf()
    .set("spark.driver.maxResultSize", "21g")
    .set("spark.akka.frameSize", "2011")
    .set("spark.eventLog.enabled", "true")
    .set("spark.eventLog.enabled", "true")
    .set("spark.eventLog.dir", configVar.sparkLogDir)
    )

简化后的代码看起来是这样的

 val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val broadcastParser = sc.broadcast(new Parser())

val featuresRdd = hc.sql("select "+ configVar.columnName + " from " + configVar.Table +" ORDER BY RAND() LIMIT " + configVar.Articles)
val myRdd : org.apache.spark.rdd.RDD[String] = featuresRdd.map(doSomething(_,broadcastParser))

val allWords= featuresRdd
  .flatMap(line => line.split(" "))
  .count

val wordQuantiles= featuresRdd
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)
  .map(pair => (pair._2 , pair._2))
  .reduceByKey(_+_)
  .sortBy(_._1)
  .collect
  .scanLeft((0,0.0)) ( (res,add) => (add._1, res._2+add._2) )
  .map(entry => (entry._1,entry._2/allWords))

val dictionary = featuresRdd
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _) // here I have Rdd of word,count tuples
  .filter(_._2 >= moreThan)
  .filter(_._2 <= lessThan)
  .filter(_._1.trim!=(""))
  .map(_._1)
  .zipWithIndex
  .collect
  .toMap

错误堆栈

Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1511)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1435)
at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1715)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1714)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:146)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
at org.apache.spark.rdd.RDD.count(RDD.scala:1121)
at sparkTesting.InputGenerationAndDictionaryComputations$.createDictionary(InputGenerationAndDictionaryComputations.scala:50)
at sparkTesting.Runner$.main(Runner.scala:133)
at sparkTesting.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

12
根据我的经验,这几乎总是由于OOM异常引起的。请尝试查看每个执行器机器上的日志文件。 - Glennie Helles Sindholt
2
我会从你的作业中打印stacktrace,并使用一些Java util工具:如jstat、jstatd、jconsole监视JVM堆大小...以了解更多限制信息。如果您仍有物理内存,可以在启动应用程序之前增加JVM内存大小!您可以基于优化的堆大小调整您的集合大小。 - Gabor Jakab
1
我在pyspark中也遇到了这个问题。在我的情况下,这是由于容器内存不足导致的,我们可以在启动spark实例时使用参数--executor.memory YOUR_MEMORY_SIZE来调整内存大小。如果你的spark运行在hadoop上,这个值不能超过yarn设置的yarn.scheduler.maximum-allocation-mb的值。在hadoop 2.7.7中,yarn.scheduler.maximum-allocation-mb的默认值为8192MB。在我的情况下,我使用以下命令来运行pyspark:pyspark --master yarn --executor.memory,解决了这个问题。 - libin
3个回答

11

找到答案了。

我的表格以20GB的avro文件保存。当执行器试图打开它时,它们每个人都必须将20GB加载到内存中。通过使用csv而不是avro解决了这个问题。


8

这种情况的症状通常是执行者任务中的 OutOfMemory 错误。尝试在启动作业时增加执行器的内存。参见 spark-submit、spark-shell 等参数 --executor-memory。默认值为1G。


1
另一个可能导致“SparkContext已关闭”错误的原因是在评估其他代码后导入jar文件。(这可能仅在Spark Notebook中发生。)
要解决此问题,请将所有:cp myjar.jar语句移至文件开头。

6
在你对这篇文章进行点踩之前,请提出改进的建议。谢谢! - Josiah Yoder

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接