针对每个文件运行Airflow DAG

4
所以我在airflow中有一个相当不错的DAG,它基本上对二进制文件运行了几个分析步骤(作为airflow插件实现)。 DAG由ftp sensor触发,该sensor只是检查ftp服务器上是否有新文件,然后启动整个工作流程。
目前的工作流程如下:DAG按照定义被触发->sensor等待ftp上的新文件->执行分析步骤->工作流程结束。
我想要的是这样的东西:DAG被触发-> sensor等待ftp上的新文件->对于ftp上的每个文件,单独执行分析步骤->每个工作流程单独结束。
怎么才能使分析工作流程针对ftp服务器上的每个文件执行,并且如果服务器上没有文件,就只需一个sensor等待新文件即可呢?我不想例如每秒启动一个DAG,因为那样我会有很多sensor一直在等待新的文件。
1个回答

3

使用2个DAG将感知步骤与分析步骤分开。

DAG 1:

传感器在ftp上等待新文件 -> 新文件到达后,使用TriggerDagRunOperator触发DAG 1本身 -> 使用TriggerDagRunOperator触发DAG 2

DAG 2:

对文件执行分析步骤。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接