如何使用TriggerDagRunOperator触发Airflow中的dag?

18

我找到以下链接:

https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

它确实解释了如何使用TriggerDagRunOperator来执行独立的Airflow DAG。文档使用了Airflow自己的示例DAG,但我很难理解它们,因为它们没有使用任何sensor。

有人可以解释一下如何使用TriggerDagRunOperatorSqlSensor启动独立的DAG吗?当我的SQL Server作业任务完成时,我正在尝试启动单独的DAG。我知道如何使用SqlSensor检查SQL Server作业的状态,但我不知道如何将结果附加到TriggerDagRunOperator以启动单独的DAG。

我不想使用Airflow CLI或在一个DAG中完成两个任务。基本上,我希望这只是触发器DAG。

以下是我当前的代码,缺少关键的conditionally_trigger

# File Name: check-when-db1-sql-task-is-done

from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from airflow.operators import SqlSensor
from datetime import datetime


default_args = {
        'owner': 'airflow',
        'retry_delay': timedelta(minutes=5),
}

dag = DAG('check-when-db1-sql-task-is-done',
        description='Check-when-DB1-SQL-task-is-done',
        default_args=default_args,
        schedule_interval='@once',
        start_date=datetime.now(),
        )

# returns-0-or-1-based-on-job-task-status
sqlsensor = SqlSensor (
        task_id='sql-sensor',
        poke_interval=30,
        timeout=3200,
        sql="""select last_run_outcome from msdb.dbo.sysjobsteps where job_id = '249A5A5D-6AFC-4D6B-8CB1-27C16724A450' and step_id = '1' and last_run_date = (select convert(varchar(24),getdate(),112)); """,    
        mssql_conn_id='db1',
        dag=dag,
        )

# dag-to-start
trigger = TriggerDagRunOperator (
        task_id='start-ssh-job',
        trigger_dag_id="qa-knime-ssh-task",
        python_callable=conditionally_trigger,
        params={'condition_param': True,
                'message': 'Hello World'},
        dag=dag)
1个回答

9
我的理解是,TriggerDagRunOperator 用于在想要使用Python函数来确定是否触发SubDag时使用。该函数在您的代码和示例中称为 conditionally_trigger
在您的情况下,您正在使用传感器来控制流程,并且不需要传递函数。您可以使用 SubDagOperator 替代 TriggerDagRunOperator 或将一个简单的始终为真的函数作为 python_callable 传递。
...
python_callable=lambda(context, dag_run_obj):dag_run_obj,
...

这正是我在被卡住后寻找的确认,在exampleTriggerDagRunOperator docs中遇到问题。你能否支持(或反对)以下说法:如果没有将参数(run_idpayload)传递给新触发DAG并在python_callable中返回True,是否就足够了呢?(显然它没有被设计成那样,但出于了解的目的..)? - y2k-shubham
一个语法问题:在那个lambda参数中,是否真的需要括号(我是Python新手)? - y2k-shubham
1
括号是可选的。对于您的另一个问题,我认为答案是:是的,所有重要的是python_callable返回true以表示子DAG应该运行;如果python_callable返回false,则不运行子DAG。对于您的用途,看起来SubDagOperator更适合。 - 7yl4r

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接