使用Airflow进行批处理,根据父任务的输出动态启动多个任务。

3
我正在尝试确定Airflow是否可以用于表达一个工作流,其中需要基于父任务的输出启动多个相同任务的实例。Airflow支持多个worker,并且我天真地认为可以使用Airflow来编排涉及批处理的工作流程。到目前为止,我没有找到任何适合这种模型的配方/指南。如何正确地利用Airflow进行像下面这样的批处理工作流程?假设有一个Airflow worker池。
工作流示例: 1. 启动任务A以生成多个文件 2. 对于每个文件,请启动Task B的一个实例(可能是另一个工作流) 3. 等待所有Task B实例完成后,然后启动Task C

1
据我所知,您可以做到以下几点:1. 使用创建文件后的触发器DAG B和参数{filepath,last file bool}创建一个DAG。2. 处理filepath,如果是最后一个文件,则触发DAG C。 - Andrey Kartashov
@AndreyKartashov 谢谢!您是否偶然遇到过一个或多个示例,展示了您上面描述的工作流中涉及的活动? - kpax
1
这里有两个DAGs:https://github.com/datirium/biowardrobe-airflow-analysis/blob/master/biowardrobe_airflow_analysis/biowardrobe/download.py - 其中一个是监控数据库,如果有数据可用,则触发另一个DAG来处理每个新记录。 - Andrey Kartashov
1个回答

1
作为在Airflow中并行处理输入数据的hack,我使用一个自定义操作符将输入分成预定数量的分区。下游操作符会为每个分区复制,如果需要,结果可以再次合并。对于本地文件,该操作符运行split命令。在Kubernetes中,这与集群自动缩放很好地配合使用。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接