当超过"dagrun_timeout"时,Airflow会触发"on_failure_callback"。

7

目前正在设置Airflow中长时间运行任务的警报。为了取消/失败airflow dag,我在default_args中放置了"dagrun_timeout",它可以在运行时间过长(通常卡住)时执行我需要的操作,使dag失败/出错。唯一的问题是,当dagrun_timeout超时时,"on_failure_callback"函数不会被调用,因为"on_failure_callback"在任务级别上(我认为),而dagrun_timeout在dag级别上。

如何在dagrun_timeout超时时执行"on_failure_callback"函数,或者如何指定在dag失败时调用函数?还是我应该重新考虑我的方法?

1个回答

6
尝试在DAG声明期间设置on_failure_callback
with DAG(
    dag_id="failure_callback_example",
    on_failure_callback=_on_dag_run_fail,
    ...
) as dag:
...

解释在于default_args中定义的on_failure_callback只会被传递给创建的任务(Tasks),而不是DAG对象本身。

以下是一个例子来验证这种行为:


from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator


def _on_dag_run_fail(context):
    print("***DAG failed!! do something***")
    print(f"The DAG failed because: {context['reason']}")
    print(context)


def _alarm(context):
    print("** Alarm Alarm!! **")
    task_instance: TaskInstance = context.get("task_instance")
    print(f"Task Instance: {task_instance} failed!")


default_args = {
    "owner": "mi_empresa",
    "email_on_failure": False,
    "on_failure_callback": _alarm,
}


with DAG(
    dag_id="failure_callback_example",
    start_date=datetime(2021, 9, 7),
    schedule_interval=None,
    default_args=default_args,
    catchup=False,
    on_failure_callback=_on_dag_run_fail,
    dagrun_timeout=timedelta(seconds=45),
) as dag:

    delayed = BashOperator(
        task_id="delayed",
        bash_command='echo "waiting..";sleep 60; echo "Done!!"',
    )
    will_fail = BashOperator(
        task_id="will_fail",
        bash_command="exit 1",
        # on_failure_callback=_alarm,
    )
delayed >> will_fail

您可以在调度器日志中找到回调函数执行的日志信息:AIRFLOW_HOME/logs/scheduler/date/failure_callback_example

[2021-09-24 13:12:34,285] {logging_mixin.py:104} INFO - [2021-09-24 13:12:34,285] {dag.py:862} INFO - Executing dag callback function: <function _on_dag_run_fail at 0x7f83102e8670>
[2021-09-24 13:12:34,336] {logging_mixin.py:104} INFO - ***DAG failed!! do something***
[2021-09-24 13:12:34,345] {logging_mixin.py:104} INFO - The DAG failed because: timed_out

编辑:

context字典中,键reason用于指定DAG运行失败的原因。一些可能的值包括:'reason': 'timed_out''reason': 'task_failure'。根据DAG运行失败的reason在回调函数中执行特定的行为。


这个解决方案的一个问题是,如果DAG中的某个任务失败(导致DAG失败),_on_dag_run_fail将被调用,我需要仅在超时时触发DAG级别回调。 - Anton
嘿@Anton,我不确定我是否理解你的意思。当DAG运行失败时,无论原因如何,DAG on_failure_callback将被调用。如果您想了解失败的原因并相应地采取行动,则可以访问上下文字典中传递的键reason。我会在我的答案中添加这个信息。 - NicoE
你好@NicoE-这正是我所想的,但根据此处文档中所解释的内容,似乎它并不像这样工作。 - Axel Borja
你能否更好地解释一下为什么在default_args中定义的on_failure_callback仅适用于任务而不适用于DAG对象?对我来说这没有任何意义。@NicoE - Luan Carvalho

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接