如何使用Python在Airflow中实现DAG成功后触发另一个DAG?

25

我有一个Python DAG Parent Job 和 DAG Child Job。在Parent Job每天运行成功后,应该触发Child Job中的任务。如何添加外部作业触发器?

我的代码

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')

execute_notebook = PostgresOperator(
  task_id='data_sql',
  postgres_conn_id='REDSHIFT_CONN',
  sql="SELECT * FROM athena_rs.shipments limit 5",
  dag=dag
)

这个回答解决了你的问题吗?如何在Airflow中设置DAG之间的依赖关系? - moon
@LuckyGuess 这个例子展示了一个任务触发另一个DAG中的另一个任务。我认为他想要的是,一个DAG的完成完全触发下一个DAG。如果您能展示一个例子,那就太好了。 - pankaj
2
我强烈建议使用TriggerDagRunOperator来执行反应式触发,而不是使用ExternalTaskSensor来执行基于轮询的触发 - y2k-shubham
@y2k-shubham,如果您能够编写一个像下面写的示例,那么对其他人来说也是学习的。我也遇到了同样的问题。 - pankaj
1
@pankaj 我已经添加了一个答案,展示了TriggerDagRunOperator的用法。 - y2k-shubham
3个回答

35

答案已经在此线程中给出。以下是演示代码:

父dag:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 4, 29),
}

dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')

leave_work = DummyOperator(
    task_id='leave_work',
    dag=dag,
)
cook_dinner = DummyOperator(
    task_id='cook_dinner',
    dag=dag,
)

leave_work >> cook_dinner

子 DAG:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 4, 29),
}

dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')

# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task
# when cook_dinner is finished, Child_dag will be triggered
wait_for_dinner = ExternalTaskSensor(
    task_id='wait_for_dinner',
    external_dag_id='Parent_dag',
    external_task_id='cook_dinner',
    start_date=datetime(2020, 4, 29),
    execution_delta=timedelta(hours=1),
    timeout=3600,
)

have_dinner = DummyOperator(
    task_id='have_dinner',
    dag=dag,
)
play_with_food = DummyOperator(
    task_id='play_with_food',
    dag=dag,
)

wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food

图片:

Dags

Dags

父Dag

Parent_dag

子Dag

Child_dag


我尝试了这个,但是出现了超时错误。[MainThread] INFO airflow.task.operators - [2020-05-01 09:51:14,444] {{external_task_sensor.py:115}} Poking for RS_Input_Cleansing.events_input_sql on 2020-04-29T23:00:00+00:00 ... [MainThread] ERROR airflow.task - [2020-05-01 09:51:14,508] {{taskinstance.py:1088}} Snap. Time is OUT. Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 955, in _run_raw_task result = task_copy.execute(context=context) - aeapen
如果“父Dag”已经处于成功状态,那么手动触发“子Dag”会运行吗? - aeapen
你可能需要打开另一个问题并发布完整的日志。但是回答你的问题,是的,如果RS_Input_Cleansing.events_input_sql按时完成,子DAG将自动运行。 - moon
父DAG的最后一个任务花费了2小时才完成。这是超时的原因吗?因为我们只设置了1小时的超时时间。 - aeapen
@LuckyGuess,你能看一下这个问题吗?https://dev59.com/Hrroa4cB1Zd3GeqPp7qw#61562437。我真的很喜欢你用例子解释的风格。 - Ria Alves
显示剩余3条评论

11

根据@pankaj的要求请求, 我在此添加了一个片段,描述了使用TriggerDagRunOperator实现反应触发(与ExternalTaskSensor轮询触发相对)。

from typing import List

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule

# DAG object
my_dag: DAG = DAG(dag_id='my_dag',
                  start_date=..)
..
# a list of 'tail' tasks: tasks that have no downstream tasks
tail_tasks_of_first_dag: List[BaseOperator] = my_magic_function_that_determines_all_tail_tasks(..)
..

# our trigger task
my_trigger_task: TriggerDagRunOperator = TriggerDagRunOperator(dag=my_dag,
                                                               task_id='my_trigger_task',
                                                               trigger_rule=TriggerRule.ALL_SUCCESS,
                                                               external_dag_id='id_of_dag_to_be_triggered')
# our trigger task should run when all 'tail' tasks have completed / succeeded
tail_tasks_of_first_dag >> my_trigger_task

请注意,片段仅供参考;它尚未经过测试。

注意事项/参考资料


它成功了。它的子任务在父任务的成功上运行。我仍然有疑问。我的子任务的DAG是dag = DAG('Child', default_args=default_args, catchup=False, schedule_interval='@daily')。我的父任务被安排在早上8:30运行。子任务在父任务8:30后完成后运行,并且还会在12:00 AM再次运行。我在DAG中漏掉了什么。 - aeapen
2
@aeapen,你可能想将你的子DAG的schedule_interval设置为None。这样它就不会被Airflow自动运行,只有在父DAG完成后才会被触发。 - y2k-shubham

4

我相信你正在寻找SubDags操作符,用于在较大的Dag中运行一个Dag。 请注意,像下面的示例中创建许多子Dag会很快变得混乱,因此我建议将每个子Dag拆分为一个文件,并将它们导入到主文件中。

SubDagOperator易于使用,您只需要给出一个Id、一个子Dag和一个父Dag。

subdag_2 = SubDagOperator(
        task_id="just_some_id", 
        subdag=child_subdag, <---- this must be a DAG
        dag=parent_dag, <----- this must be a DAG
        )

它将会看起来像这样: this

来自 他们的示例库

from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
def subdag(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG(
            dag_id='%s.%s' % (parent_dag_name, child_dag_name),
            default_args=args,
            schedule_interval="@daily",
            )

    for i in range(5):
        DummyOperator(
                task_id='%s-task-%s' % (child_dag_name, i + 1),
                default_args=args,
                dag=dag_subdag,
                )

    return dag_subdag

DAG_NAME = 'example_subdag_operator'

args = {
        'owner': 'airflow',
        'start_date': days_ago(2),
        }

dag = DAG(
        dag_id=DAG_NAME,
        default_args=args,
        schedule_interval="@once",
        tags=['example']
        )

start = DummyOperator(
        task_id='start-of-main-job',
        dag=dag,
        )

some_other_task = DummyOperator(
        task_id='some-other-task',
        dag=dag,
        )


end = DummyOperator(
        task_id='end-of-main-job',
        dag=dag,
        )

subdag = SubDagOperator(
        task_id='run-this-dag-after-previous-steps',
        subdag=subdag(DAG_NAME, 'run-this-dag-after-previous-steps', args),
        dag=dag,
        )

start >> some_other_task >> end >> subdag

@aeapen,job 是指任务还是 DAG?这段代码确切地实现了您所描述的内容... 您需要做的唯一事情就是在此流水线末尾添加父Dag以匹配您的情况。我将更新DAG以演示它... 但思路是,不是独立拥有两个不同的DAG,而是嵌套DAG,父->子...这对我来说更有意义。 - Bernardo stearns reisen
另一种方法是使用ExternalTaskSensor运算符从一个DAG触发另一个DAG,但在我看来这更加令人困惑。 - Bernardo stearns reisen
@aeapen,我更新了我的解决方案,这符合你的要求了吗? - Bernardo stearns reisen
我的意思是,Job是一个DAG,其中包含多个任务。 - aeapen
那么我需要使用SubDagOperator还是ExternalDagSensor来实现这个目的? - aeapen
显示剩余11条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接