Airflow DAG 循环 - 如何使每次迭代顺序执行而不是并行执行

4

我有一个Apache Airflow DAG,如下所示:

DAG_NAME='my_dag'
sections = ["0", "1", "2", "3"]

with DAG(DAG_NAME, default_args=default_args, schedule_interval=None) as dag:

        for s in sections:
            a = DummyOperator(task_id=f"section_{s}_start")
            b = SubDagOperator(task_id=f"init_{s}_subdag",subdag=init_section(DAG_NAME,f"init_{s}_subdag", default_args))
            c = SubDagOperator(task_id=f"process_{s}_subdag", subdag=process_section(DAG_NAME,f"process_{s}_subdag", default_args))
            d = SubDagOperator(task_id=f"update_{s}_subdag", subdag=update_section(DAG_NAME,f"update_{s}_subdag", default_args))
            e = DummyOperator(task_id=f"section_{s}_end")
            a>>b>>c>>d>>e

这段代码可以将我的任务渲染成这样:enter image description hereenter image description here 我该如何使任务序列变为: section_0_start>>init_0_subdag>>process_0_subdag>>update_0_subdag>>section_0_end section_0_end>>section_1_start section_1_start>>init_1_subdag>>process_1_subdag>>update_1_subdag>>section_1_end
......
等等,从第 0 部分开始,以第 3 部分任务结束
谢谢。

你确定需要子DAG来完成这个任务吗? - Meghdeep Ray
@MeghdeepRay 每个子DAG内部都有更多需要并行运行的任务。例如,我在每个处理子DAG中读取5个文件,这些文件需要并行运行。此外,我希望我的团队可以重用我的子DAG来编写他们的代码,因此我正在尝试制作一个通用模板。您是否有其他更好的实现方式? - banditKing
1个回答

6

将 for 循环修改为:

    previous_e = None
    for s in sections:
        a = ...
        ...
        e = ...
        if previous_e:
            previous_e >> a
        a>>b>>c>>d>>e
        previous_e = e

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接