on_failure_callback在airflow DAG()中无法工作。

3
我想为dag失败和成功调用两个不同的函数。为此,我想在DAG()函数中使用on_failure_callback和on_success_callback。
根据我的要求,这些回调应该在dag级别而不是任务级别上。因此,我在声明dag变量时,在DAG()函数内部编写这些回调函数。
但是,这些回调函数没有被调用。如果我在任务级别上调用相同的函数,则正常工作。
这是我的代码:
def success():
      print("successful")
    
dag = DAG(dag_id='callback_test',schedule_interval=None,default_args=default_args,on_success_callback=success)
    
    def fun1(**kwargs):
        print("function called")
    
    task1 = PythonOperator(
        task_id='task1',
        provide_context=True,
        python_callable=fun1,
        dag=dag
        )
    
    task1
2个回答

0

如果无法正常工作,请使用可重现示例在 https://github.com/apache/airflow/issues 进行报告。最好请针对最新的Airflow版本进行测试,1.10.10是旧版本。 - Elad Kalif
1
我已经让它工作了。在执行以下操作时要小心: on_failure_callback=myfunction 并且不要在函数中包含任何参数,参数是包含在执行上下文中的。 - vrivesmolina

0

如果你想传递参数,可以通过创建高阶函数来实现,就像创建装饰器一样,通过外部函数返回实际的回调函数。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接