如何在Airflow中设置DAG之间的依赖关系?

47

我正在使用Airflow来调度批处理作业。我有一个每晚运行的DAG(A)和一个每月运行一次的DAG(B)。B依赖于A成功完成。但是B运行时间较长,因此我想将其保留在单独的DAG中,以便更好地进行SLA报告。

如何使运行DAG B依赖于当天DAG A成功运行?


请参见Wiring top-level DAGs together - y2k-shubham
3个回答

50

您可以使用一个名为ExternalTaskSensor的操作符来实现此行为。在DAG(A)中的任务(A2)成功后,DAG(B)中的任务(B1)将被调度并等待。

External Task Sensor文档


1
但是我们将无法可视化依赖关系,对吧? - nono
2
@nono 是的,你不会。 - p.magalhaes
8
这是推荐的做法吗?我有一个每天需要等待DagA(包括5个任务)和DagB(5个单独的任务)完成的任务。我的DagC应该等待这两个任务都成功后,从数据库中查询两个表格,聚合和连接它们,然后发送一些电子邮件/文件。 - trench
@nono 我猜你可以编写一些代码来解析所有的dag定义文件,找到ExternalTaskSensor dag引用并生成网络图。当使用上游/下游函数(和位移快捷方式)在调度程序中加载dags时,类似的事情一定会发生。这将稍微复杂一些,因为您需要查看所有dag定义。不过这是个好主意,肯定可行。我想另一种方法 - 使用较少的dags和许多子dags - 是这种功能的重点所在。 - Davos

13

是的,triggerDagRunOp可以用于处理DAG之间的依赖关系,但是当您有更多相互依赖的DAG时,该过程会变得棘手。看起来Airflow缺少这个功能。您知道开发人员是否计划朝这个方向发展吗? - ozw1z5rd
我建议你去这个网址:https://cwiki.apache.org/confluence/display/AIRFLOW/Roadmap 或者在 Gitter 或 The Airflow 邮件列表上询问。 - nono

3

当需要跨DAG依赖关系时,通常有两个要求:

  1. Task B1 on DAG B needs to run after task A1 on DAG A is done. This can be achieved using ExternalTaskSensor as others have mentioned:

    B1 = ExternalTaskSensor(task_id="B1",
                            external_dag_id='A',
                            external_task_id='A1',
                            mode="reschedule")
    
  2. When user clears task A1 on DAG A, we want Airflow to clear task B1 on DAG B to let it re-run. This can be achieved using ExternalTaskMarker (since Airflow v1.10.8).

    A1 = ExternalTaskMarker(task_id="A1", 
                            external_dag_id="B",
                            external_task_id="B1")
    
请查看有关跨DAG依赖项的文档以获取更多详细信息:https://airflow.apache.org/docs/stable/howto/operator/external.html

不幸的是,由于https://github.com/apache/airflow/issues/14260,在Airflow 2.0.1中,ExternalTaskMarker目前无法使用。 - Zach

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接