尝试在DAG声明期间设置
on_failure_callback
:
with DAG(
dag_id="failure_callback_example",
on_failure_callback=_on_dag_run_fail,
...
) as dag:
...
解释在于
default_args
中定义的
on_failure_callback
只会被传递给创建的
任务(Tasks),而不是DAG对象本身。
以下是一个例子来验证这种行为:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator
def _on_dag_run_fail(context):
print("***DAG failed!! do something***")
print(f"The DAG failed because: {context['reason']}")
print(context)
def _alarm(context):
print("** Alarm Alarm!! **")
task_instance: TaskInstance = context.get("task_instance")
print(f"Task Instance: {task_instance} failed!")
default_args = {
"owner": "mi_empresa",
"email_on_failure": False,
"on_failure_callback": _alarm,
}
with DAG(
dag_id="failure_callback_example",
start_date=datetime(2021, 9, 7),
schedule_interval=None,
default_args=default_args,
catchup=False,
on_failure_callback=_on_dag_run_fail,
dagrun_timeout=timedelta(seconds=45),
) as dag:
delayed = BashOperator(
task_id="delayed",
bash_command='echo "waiting..";sleep 60; echo "Done!!"',
)
will_fail = BashOperator(
task_id="will_fail",
bash_command="exit 1",
)
delayed >> will_fail
您可以在调度器日志中找到回调函数执行的日志信息:AIRFLOW_HOME/logs/scheduler/date/failure_callback_example
:
[2021-09-24 13:12:34,285] {logging_mixin.py:104} INFO - [2021-09-24 13:12:34,285] {dag.py:862} INFO - Executing dag callback function: <function _on_dag_run_fail at 0x7f83102e8670>
[2021-09-24 13:12:34,336] {logging_mixin.py:104} INFO - ***DAG failed!! do something***
[2021-09-24 13:12:34,345] {logging_mixin.py:104} INFO - The DAG failed because: timed_out
编辑:
在context
字典中,键reason
用于指定DAG运行失败的原因。一些可能的值包括:'reason': 'timed_out'
或 'reason': 'task_failure'
。根据DAG运行失败的reason
在回调函数中执行特定的行为。
on_failure_callback
将被调用。如果您想了解失败的原因并相应地采取行动,则可以访问上下文字典中传递的键reason
。我会在我的答案中添加这个信息。 - NicoE