如何在循环中实现Airflow DAG

3

我刚开始使用Airflow。我想要在循环中设置一个DAG,使得下一个DAG在上一个DAG完成后启动。这是我想要实现的工作流程:

list_of_files = [......]
for file in list_of_files:
   dag = DAG('pipeline', default_args=default_args, schedule_interval=None)
   t1 = BashOperator('copy_this_file', ....)
   t2 = BashOperator('process_this_file', ...)
   t1.set_downstream(t2)

如果我运行 airflow backfill pipeline -s 2019-05-01,则所有的DAG都会同时启动。
1个回答

3

DAGs之间不能互相依赖,它们是独立的工作流程。相反地,您需要配置任务以相互依赖。您可以拥有一个单一的DAG,其中包含多个执行分支,每个分支对应一个文件,类似于以下内容(未经测试):

dag = DAG('pipeline', ...)
list_of_files = [......]
with dag:
    for file in list_of_files:
       t1 = BashOperator('copy_this_file', ....)
       t2 = BashOperator('process_this_file', ...)
       t1.set_downstream(t2)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接