我正在使用Airflow来调度批处理作业。我有一个每晚运行的DAG(A)和一个每月运行一次的DAG(B)。B依赖于A成功完成。但是B运行时间较长,因此我想将其保留在单独的DAG中,以便更好地进行SLA报告。
如何使运行DAG B依赖于当天DAG A成功运行?
您可以使用一个名为ExternalTaskSensor的操作符来实现此行为。在DAG(A)中的任务(A2)成功后,DAG(B)中的任务(B1)将被调度并等待。
看起来TriggerDagRunOperator也可以使用,并且您可以使用Python可调用函数添加一些逻辑。 如此处所述:https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand
当需要跨DAG依赖关系时,通常有两个要求:
Task B1
on DAG B
needs to run after task A1
on DAG A
is done. This can be achieved using ExternalTaskSensor
as others have mentioned:
B1 = ExternalTaskSensor(task_id="B1",
external_dag_id='A',
external_task_id='A1',
mode="reschedule")
When user clears task A1
on DAG A
, we want Airflow to clear task B1
on DAG B
to let it re-run. This can be achieved using ExternalTaskMarker
(since Airflow v1.10.8).
A1 = ExternalTaskMarker(task_id="A1",
external_dag_id="B",
external_task_id="B1")
ExternalTaskMarker
目前无法使用。 - Zach