我需要几个相同的(仅参数不同)顶级DAG
,它们也可以同时触发,具有以下约束/假设:
- 单个顶级DAG将具有
schedule_interval=None
,因为它们只需要偶尔进行手动触发 - 然而,DAG系列需要每天运行
- 顺序和数量在系列中的DAG是固定的(编写代码之前已知),并且很少更改(每隔几个月一次)
- 无论DAG是成功还是失败,触发链都不能中断
- 目前它们必须按系列一起运行;将来可能需要并行触发
所以我为dags
目录中的每个DAG创建了一个文件,现在我必须将它们连接起来以进行连续执行。我已经确定了两种方法可以实现这一点:
SubDagOperator
TriggerDagRunOperator
- 在我的演示中可以工作,但以并行方式运行(不是顺序),因为它在移动到下一个之前不会等待触发的DAG完成
ExternalTaskSensor
可能有助于克服上述限制,但会使事情非常混乱
我的问题是
- 如何克服
SubDag
的dag_id
中parent_id
前缀的限制? - 如何强制
TriggerDagRunOperator
等待DAG完成? - 有没有其他/更好的方法将独立(顶级)DAG连接在一起?
- 是否有解决方案可以为每个顶级DAG创建单独的文件(对于仅在输入方面不同的DAG)?
我正在使用puckel/docker-airflow,其中包括:
Airflow 1.9.0-4
Python 3.6-slim
CeleryExecutor
与redis:3.2.7
配合使用
编辑-1
澄清@Viraj Parekh的查询
您能否详细说明一下在触发之前等待DAG完成的含义?
当我触发import_parent_v1
DAG时,它应该使用TriggerDagRunOperator
启动的所有3个外部DAG都会并行运行,即使我将它们顺序链接。实际上,日志表明,虽然它们是一个接一个地启动的,但在前一个结束之前执行已经转移到下一个DAG(TriggerDagRunOperator
)。
注意:在此示例中,顶级DAG的名称为
importer_child_v1_db_X
,它们对应的task_id
(针对TriggerDagRunOperator
)的名称为importer_v1_db_X
我必须将几个类似的DAG链接在一起,形成一个工作流,逐个触发它们。因此,不只是一个可以放在最后的TriggerDagRunOperator,而是许多(这里有3个,但在生产环境中可能会达到15个)。TriggerDagRunOperator是否可以成为DAG中的最后一个任务?
Airflow v2.0
之前不应扩展SubDagOperator
。 - y2k-shubhamAirflow
的dev
邮件列表上提出的问题的 链接。该邮件列表的链接为:(https://lists.apache.org/list.html?dev@airflow.apache.org)。 - y2k-shubham