Spark作业完成但应用程序需要时间关闭

14

使用Scala运行Spark作业,所有作业都按时完成,但是在作业停止之前会打印一些INFO日志,持续20-25分钟。

以下是几个UI截图,可以帮助理解问题。

  1. 以下是4个阶段所需的时间:

Time taken by 4 stages

  1. 以下是连续作业ID之间的时间 time between consecutive job ids

我不明白为什么在两个作业ID之间花费了这么多时间。

以下是我的代码片段:

    val sc = new SparkContext(conf)
for (x <- 0 to 10) {
  val zz = getFilesList(lin);
  val links = zz._1
  val path = zz._2
  lin = zz._3
  val z = sc.textFile(links.mkString(",")).map(t => t.split('\t')).filter(t => t(4) == "xx" && t(6) == "x").map(t => titan2(t)).filter(t => t.length > 35).map(t => ((t(34)), (t(35), t(5), t(32), t(33))))
  val way_nodes = sc.textFile(way_source).map(t => t.split(";")).map(t => (t(0), t(1)));
  val t = z.join(way_nodes).map(t => (t._2._1._2, Array(Array(t._2._1._2, t._2._1._3, t._2._1._4, t._2._1._1, t._2._2)))).reduceByKey((t, y) => t ++ y).map(t => process(t)).flatMap(t => t).combineByKey(createTimeCombiner, timeCombiner, timeMerger).map(averagingFunction).map(t => t._1 + "," + t._2)
  t.saveAsTextFile(path)
}
sc.stop()

更多跟进:Spark 1.4.1在EMR 4.0.0上将saveAsTextFile保存到S3非常缓慢


我通常建议使用Databricks的spark-csv包而不是saveAsTextFile,但除此之外,您正在运行哪个版本的Spark? - Glennie Helles Sindholt
saveAsTextFile的优点是我可以直接将所有内容保存在S3上,不确定spark-csv包databricks如何工作。感谢您提供的指导,我会继续研究它。 Spark - 1.4.1 Scala - 2.10.6 - Harshit
3个回答

23

正如我在评论中所提到的,我建议使用spark-csv包而不是sc.saveAsTextFile,并且使用该包直接向s3写入没有问题 :)

我不知道您是否使用s3或s3n,但可以尝试切换。我在Spark 1.5.2(EMR-4.2)上使用s3a时遇到了写入超时的问题,一直切换回s3才解决了这个问题,所以值得一试。

另外应该能加速向s3写入的方法之一是使用DirectOutputCommiter。

conf.set("spark.hadoop.mapred.output.committer.class","com.appsflyer.spark.DirectOutputCommitter")

禁用生成 _SUCCESS 文件:

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
请注意,禁用_SUCCESS文件必须在SparkContext的Hadoop配置中设置,而不是在SparkConf中设置。
希望这可以帮助您。

请问您能否建议如何处理此问题:https://stackoverflow.com/questions/62036791/while-writing-to-hdfs-path-getting-error-java-io-ioexception-failed-to-rename - BdEngineer

2

在将文件写入S3时,我遇到了相同的问题。我使用的是spark 2.0版本,为您提供一个经过验证的答案的更新代码:

在Spark 2.0中,您可以使用以下代码:

val spark = SparkSession.builder().master("local[*]").appName("App_name").getOrCreate()

spark.conf.set("spark.hadoop.mapred.output.committer.class","com.appsflyer.spark.DirectOutputCommitter")
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

这解决了我的工作卡顿问题。

请问您能否建议如何处理此问题 https://stackoverflow.com/questions/62036791/while-writing-to-hdfs-path-getting-error-java-io-ioexception-failed-to-rename - BdEngineer

2

我最终升级了我的Spark版本,问题得到了解决。


请问您能否建议如何处理此问题:https://stackoverflow.com/questions/62036791/while-writing-to-hdfs-path-getting-error-java-io-ioexception-failed-to-rename - BdEngineer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接