Spark,在EMR中抛出SparkException时出现错误行为

7
我正在使用YARN作为资源管理器,在2个节点上在EMR中运行一个Spark作业。如果我的条件不满足,我需要有意地使步骤失败,以便下一步不会按照配置执行。为了实现这一点,我在将日志消息插入DynamoDB后抛出自定义异常。
它可以正常运行,但是Dynamo中的记录被插入了两次。
以下是我的代码。
if(<condition>) {
  <method call to insert in dynamo> 
  throw new SparkException(<msg>);
  return;
}

如果我删除抛出异常的行,它就可以正常工作,但步骤已经完成。
如何使步骤失败,而不会重复获取日志信息。
感谢您的帮助。
敬礼, Sorabh
1个回答

2
可能你的动力发电机消息被插入两次的原因是因为你的错误条件被两个不同的执行器触发和处理了。Spark将要完成的工作分配给它的工作者,这些工作者没有共享任何知识。
我不确定你需要Spark步骤失败的原因是什么,但我建议你在应用程序代码中跟踪故障情况,而不是试图让Spark直接出错。换句话说,编写检测错误并将其传递回Spark驱动程序的代码,然后根据需要采取相应措施。
一种方法是使用累加器来计算在处理数据时发生的任何错误。它大致看起来像这样(我假设使用Scala和DataFrames,但可以根据需要适应RDD和/或Python):
val accum = sc.longAccumulator("Error Counter")
def doProcessing(a: String, b: String): String = {
   if(condition) {
     accum.add(1)
     null
   }
   else {
     doComputation(a, b)
   }
}
val doProcessingUdf = udf(doProcessing _)

df = df.withColumn("result", doProcessing($"a", $"b"))

df.write.format(..).save(..)  // Accumulator value not computed until an action occurs!

if(accum.value > 0) {
    // An error detected during computation! Do whatever needs to be done.
    <insert dynamo message here>
}

这种方法的一个好处是,如果你正在Spark UI中寻找反馈,你将能够在运行时看到累加器的值。关于累加器的文档,请参考以下链接:http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接