我想为dag失败和成功调用两个不同的函数。为此,我想在DAG()函数中使用on_failure_callback和on_success_callback。
根据我的要求,这些回调应该在dag级别而不是任务级别上。因此,我在声明dag变量时,在DAG()函数内部编写这些回调函数。
但是,这些回调函数没有被调用。如果我在任务级别上调用相同的函数,则正常工作。
这是我的代码:
根据我的要求,这些回调应该在dag级别而不是任务级别上。因此,我在声明dag变量时,在DAG()函数内部编写这些回调函数。
但是,这些回调函数没有被调用。如果我在任务级别上调用相同的函数,则正常工作。
这是我的代码:
def success():
print("successful")
dag = DAG(dag_id='callback_test',schedule_interval=None,default_args=default_args,on_success_callback=success)
def fun1(**kwargs):
print("function called")
task1 = PythonOperator(
task_id='task1',
provide_context=True,
python_callable=fun1,
dag=dag
)
task1