如何将参数传递给Airflow中的on_success_callback和on_failure_callback函数

10

我已经使用on_success_callback和on_failure_callback实现了成功和失败的电子邮件警报。

根据Airflow文档

将一个上下文字典作为单个参数传递给此函数。

如何向这些回调方法传递另一个参数?

以下是我的代码

from airflow.utils.email import send_email_smtp

def task_success_alert(context):
    subject = "[Airflow] DAG {0} - Task {1}: Success".format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1]
        )
    html_content = """
    DAG: {0}<br>
    Task: {1}<br>
    Succeeded on: {2}
    """.format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1], 
        datetime.now()
        )
    send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)

def task_failure_alert(context):
    subject = "[Airflow] DAG {0} - Task {1}: Failed".format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1]
        )
    html_content = """
    DAG: {0}<br>
    Task: {1}<br>
    Failed on: {2}
    """.format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1], 
        datetime.now()
        )
    send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}

我打算将回调函数移到另一个包中,并将电子邮件地址作为参数传递。


当我在我的任务中使用CONTEXT时,任务会结束,但DAG仍在运行。它永远不会结束。有什么建议吗?我正在使用def task_alert(context):\n dag_id = context ['dag'] .dag_id\n task_id = context ['task_instance'] .task_id我正在调用on_failure_callback = task_alert。 - Akshay Lande
使用xcom_push和xcom_pull。 - jetsun
3个回答

11

您可以在DAG内定义一个函数,调用包中的函数。在调用该函数时,将电子邮件作为参数传递。您可以在DAG级别进一步细化,仅传递电子邮件所需的信息。

from package import outer_task_success_callback
email = 'xyz@example.com'

def task_success_alert(context):
    dag_id = context['dag'].dag_id
    task_id = context['task_instance']. task_id
    outer_task_success_callback(dag_id, task_id, email)
    
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}

在调用您的包中的函数之前,这将允许您进行自定义设置。

另外一件事,airflow具有SMTP电子邮件功能。您可以利用这些功能,而不是编写自己的解决方案。


当我在任务中使用CONTEXT时,任务会完成,但DAG仍在运行。它永远不会结束。有什么建议吗? - Akshay Lande
没有代码示例就无法理解或调试。添加一个问题并打上标签airflow。我会检查并回答如果我能够。 - nightgaunt
def notify_email(context): import inspect"""发送自定义电子邮件提醒。""" import smtplib, ssl from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart sender_email = 'abc@gmail.com' receiver_email = 'xyz@gmail.com' password = "abc" message = MIMEMultipart("alternative") #task_instance = context['task'].task_id dag_instance=context['dag_id'].dag_id 当使用context ['dag_id'] .dag_id时,我的dag会一直运行任务完成并且不发送邮件。 - Akshay Lande
@Gabriel Eckers,Airflow中的SMTP工具仍然提供了将电子邮件作为任务发送的选项。尽管没有on_success选项,但是可以在Airflow DAG内保留电子邮件功能。 - nightgaunt

7
您可以使用partial来创建具有预定义参数的函数,例如:
from functools import partial
new_task_success_alert = partial(task_success_alert, email='your_email')

然后将新功能添加为回调函数:

on_success_callback=new_task_success_alert

这个答案比突出显示的回答要好得多,因为前者依赖于在导入时知道文件名。 - Javier Muñoz

3

您可以创建一个仅用于通过xcoms推送配置设置的任务。 您可以通过context从中拉取配置,因为task_instance对象包含在context中。

def push_configuration(ti, params):
    ti.xcom_push(key='conn_id', value=params)

def task_success_alert(context):
    ti = context.get('ti') 
    params = ti.xcom_pull(key='params', task_ids='Settings')
    ...


step0 = PythonOperator(
        task_id='Settings',
        python_callable=push_configuration,
        op_kwargs={'params': params})

step1 = BashOperator(
        task_id='step1',
        bash_command='pwd',
        on_success_callback=task_success_alert)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接