如何在Airflow中强制使任务失败?

53

我有一个Python可调用函数process_csv_entries,它处理csv文件中的条目。只有当所有条目成功处理时,我的任务才能成功完成。否则,任务应该失败。


def process_csv_entries(csv_file):
    # Boolean 
    file_completely_parsed = <call_to_module_to_parse_csv>
    return not file_completely_parsed

CSV_FILE=<Sets path to csv file>
t1 = PythonOperator(dag=dag,
                      task_id='parse_csv_completely',
                      python_operator=process_csv_entries,
                      op_args=[CSV_FILE])

t1似乎无论返回值如何都能成功完成。 我该如何强制PythonOperator任务失败?

4个回答

54

当遇到错误条件时引发异常(在您的情况下:当文件未被成功解析时)

raise ValueError('File not parsed completely/correctly')

使用适当的消息引发相关错误类型


3
这有效。谢谢!我希望有更好的方法来处理这个。 - Mask
16
不过,这不会强制将任务实例状态设置为“失败”... 有没有办法绕过重试配置? - c-a

48

是的,引发 AirflowException,这将立即导致任务转移到失败状态。

from airflow import AirflowException

ValueError 可用于失败和重试。


2
任何异常都会导致任务失败并将其移至失败状态。 - Beau B.
18
如果设置了重试,有没有一种方法可以防止空气流重新尝试任务?例如,有些错误是您不想/不需要重试的,例如与无效输入相关的错误。 - Joe J
2
@JoeJ,这个问题已经有一个PR了:https://github.com/apache/airflow/pull/7133 - gabra
AirflowException还避免在日志中打印堆栈跟踪信息。+1 - Henrique Mendonça

28

如果你想在不重试的情况下使任务失败,请使用AirflowFailException :-

示例:-

from airflow.exceptions import AirflowFailException
def task_to_fail():
    raise AirflowFailException("Our api key is bad!")

如果您正在寻找重试,请使用 AirflowException

例如:

from airflow import AirflowException
def task_to_fail():
    raise AirflowException("Error msj")

26

7
如果你想知道“现在”版本是什么意思,它是在1.10.11中引入的。 - ZaxR

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接