我有一个 DAG
,每当它成功或失败时,我希望它触发一个方法来发布到 Slack。
我的 DAG args
如下:
default_args = {
[...]
'on_failure_callback': slack.slack_message(sad_message),
'on_success_callback': slack.slack_message(happy_message),
[...]
}
下面是DAG
的定义:
dag = DAG(
dag_id = dag_name_id,
default_args=default_args,
description='load data from mysql to S3',
schedule_interval='*/10 * * * *',
catchup=False
)
但是当我检查Slack时,每分钟有超过100条消息,好像它在每个调度程序心跳时进行评估,并且对于每个日志,它都运行了成功和失败方法,就好像它对于相同的任务实例既起作用又不起作用(不好)。
我应该如何正确使用on_failure_callback
和on_success_callback
来处理dags状态并调用自定义方法?
on_failure_callback
和on_success_callback
?因为您需要在任务成功/失败时请求消息。请参阅此https://dev59.com/dqLia4cB1Zd3GeqPmaVc的副本。 - Zack