Airflow - 处理DAG回调的正确方法

4

我有一个 DAG,每当它成功或失败时,我希望它触发一个方法来发布到 Slack。

我的 DAG args 如下:

default_args = {
    [...]
    'on_failure_callback': slack.slack_message(sad_message),
    'on_success_callback': slack.slack_message(happy_message),
    [...]
}

下面是DAG的定义:

dag = DAG(
    dag_id = dag_name_id,
    default_args=default_args,
    description='load data from mysql to S3',
    schedule_interval='*/10 * * * *',
    catchup=False
      )

但是当我检查Slack时,每分钟有超过100条消息,好像它在每个调度程序心跳时进行评估,并且对于每个日志,它都运行了成功和失败方法,就好像它对于相同的任务实例既起作用又不起作用(不好)。

我应该如何正确使用on_failure_callbackon_success_callback来处理dags状态并调用自定义方法?


为什么不将Slack消息作为DAG中的任务,而是使用on_failure_callbackon_success_callback?因为您需要在任务成功/失败时请求消息。请参阅此https://dev59.com/dqLia4cB1Zd3GeqPmaVc的副本。 - Zack
不是重复的问题,这个问题特别涉及到成功/失败回调函数的使用。 - cwurtz
3个回答

9

它创建消息的原因是因为在定义default_args时,您正在执行函数。 您只需要传递函数定义而不执行它。

由于函数有一个参数,这会变得有点棘手。 您可以定义两个部分函数或定义两个包装器函数。

所以你可以选择:

from functools import partial

success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

或者

def success_msg():
    slack.slack_message(happy_message);

def failure_msg():
    slack.slack_message(sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

无论哪种方法,注意只传递函数定义 failure_msgsuccess_msg,而不是它们执行时的结果。

它确实有效。但似乎生成了一条消息,发送给每个任务而不是每个DAG运行。 我现在的代码是在每个DAG运行时运行的,我正在考虑将其调整为在每个任务运行时运行,但是那么我需要检索它的“task_id”,这可能吗? 另一种方法是让它在每个DAG运行时只运行一次,这样我就不需要更改代码了。这也可能吗? 非常感谢。 - Julinho da Adelaide
@JulinhodaAdelaide,我认为1.9.0版本只能在每个任务级别上定义此项,而1.10.0版本将提供基于DAG的定义。 - tobi6
tobi6,你知道我怎么才能检索到task_id吗? - Julinho da Adelaide

5

默认参数在任务级别上展开,因此它变成了每个任务的回调函数

在“default_args”之外,在DAG标志级别应用该属性


回调参数在default_args之外无法工作。它必须在default_args中,如任务级别所述。 - Dhruv Kadia

0

你提到的 slack 方法是什么?调度程序会在每个心跳时解析你的DAG文件,所以如果slack函数在你的代码中被定义,它将在每次心跳时运行。

一些你可以尝试的事情:

  • 将要调用的函数定义为PythonOperators,然后在任务级别而不是DAG级别调用它们。

  • 你也可以使用TriggerRules来设置依赖于你ETL任务的下游任务,这些任务将根据父任务的成功或失败触发。

从文档 定义任务触发所应用的依赖关系规则。选项有:{all_success | all_failed | all_done | one_success | one_failed | dummy}

你可以在这里找到一个示例(完全透明——我是作者)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接