我们正在考虑使用Airflow来替代我们目前基于rq的自定义工作流程,但我不确定最佳的设计方式是什么。或者是否有必要使用Airflow。
使用案例是:
在数据上传后,我们将项目放入队列中:
上传:
使用案例是:
- 我们从用户那里获取数据上传。
- 根据接收到的数据类型,我们可以选择运行零个或多个作业。
- 每个作业在一定条件下运行,其运行时间取决于接收到的数据。
- 该作业从数据库中读取数据,并将结果写入另一个数据库。
- 进一步的作业可能会因这些作业而被触发。
在数据上传后,我们将项目放入队列中:
上传:
user: 'a'
data:
- type: datatype1
start: 1
end: 3
- type: datatype2
start: 2
end: 3
这将触发:
- job1,用户 'a',开始时间:1,结束时间:3
- job2,用户 'a',开始时间:2,结束时间:3
然后也许 job1 将有一些清理工作在其之后运行。(如果可能的话,将任务限制为仅在没有其他用户运行的任务时运行会很好。)
我考虑过的方法:
1.
当数据上传到消息队列时触发 DAG。
然后此 DAG 确定要运行的依赖作业,并将用户和时间范围作为参数(或 xcom)传递。
2.
当数据上传到消息队列时触发 DAG。
然后此 DAG 根据用户和时间范围中的数据类型和模板动态创建作业的 DAG。
因此,您获得每个用户、作业、时间范围组合的动态 DAG。
我甚至不确定如何从消息队列触发 DAG...而且很难找到类似于此用例的示例。也许这是因为 Airflow 不适合?
任何帮助/想法/建议都将不胜感激。
谢谢您。