如何控制Spark应用程序的每个任务/阶段/作业尝试?

3
我希望能够在某些特定异常抛出时阻止Spark重试Spark应用程序。我只想在满足某些条件时限制重试次数。否则,我希望按默认的重试次数进行。请注意,一个Spark应用程序只有一个Spark作业。当出现异常时,我尝试设置javaSparkContext.setLocalProperty("spark.yarn.maxAppAttempts", "1");,但它仍然会重新尝试整个作业。我是这样提交Spark应用程序的:
spark-submit --deploy-mode cluster theSparkApp.jar

我有一个使用情况,如果它是由同一作业的上一次重试创建的,则希望删除输出,但如果输出文件夹不为空(在第1次重试中),则使作业失败。您能想到其他实现此目的的方法吗?


你如何提交Spark应用程序进行部署?正在使用哪些命令行选项和Spark属性?顺便说一句,即使您说的是“整个Spark应用程序”,您仍然使用了“整个作业仍会重试”。一个Spark应用程序可以运行/提交一个或多个Spark作业。 - Jacek Laskowski
请使用spark-submit --deploy-mode cluster --conf spark.yarn.maxAppAttempts=1,并在命令行上设置Spark。 - Jacek Laskowski
1个回答

1
我有一个使用情况,希望在同一作业的上一次重试创建输出时删除输出,但如果输出文件夹不为空(在第1次重试中),则失败作业。你能想到其他实现方式吗? 您可以使用TaskContext来控制您的Spark作业的行为,例如根据重试次数进行如下操作:
val rdd = sc.parallelize(0 to 8, numSlices = 1)

import org.apache.spark.TaskContext

def businessCondition(ctx: TaskContext): Boolean = {
  ctx.attemptNumber == 0
}

val mapped = rdd.map { n =>
  val ctx = TaskContext.get
  if (businessCondition(ctx)) {
    println("Failing the task because business condition is met")
    throw new IllegalArgumentException("attemptNumber == 0")
  }
  println(s"It's ok to proceed -- business condition is NOT met")
  n
}
mapped.count

这里的问题是我不知道我的作业第一次重试失败是由于 businessCondition() 还是其他原因(除非我在 Spark 外部维护这个状态,但我想避免这样做)。所以我能想到的唯一可能的方法是,在满足 businessCondition() 的情况下强制 Spark 失败,而不进行任何重试。 - user401445
addTaskCompletionListener(listener: TaskCompletionListener): TaskContextaddTaskFailureListener(listener: TaskFailureListener): TaskContext 这两个方法怎么样?虽然我以前从未使用过它们,但看起来它们可能会在这里有所帮助。 - Jacek Laskowski
onApplicationEnd 可以工作,但是在 SparkListener 中无法获取 TaskContext。我需要 TaskContext 来确定要删除什么。此外,我不确定在监听器接口中是否可能找到应用程序成功或失败的信息。 - user401445

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接