面向用户的工作流程的Airflow DAG设计

3
我们正在考虑使用Airflow来替代我们目前基于rq的自定义工作流程,但我不确定最佳的设计方式是什么。或者是否有必要使用Airflow。
使用案例是:
  1. 我们从用户那里获取数据上传。
  2. 根据接收到的数据类型,我们可以选择运行零个或多个作业。
  3. 每个作业在一定条件下运行,其运行时间取决于接收到的数据。
  4. 该作业从数据库中读取数据,并将结果写入另一个数据库。
  5. 进一步的作业可能会因这些作业而被触发。
例如:
在数据上传后,我们将项目放入队列中:
上传:
user: 'a'
data:
 - type: datatype1
   start: 1
   end: 3
 - type: datatype2
   start: 2
   end: 3

这将触发:

  • job1,用户 'a',开始时间:1,结束时间:3
  • job2,用户 'a',开始时间:2,结束时间:3

然后也许 job1 将有一些清理工作在其之后运行。(如果可能的话,将任务限制为仅在没有其他用户运行的任务时运行会很好。)

我考虑过的方法:

1.

当数据上传到消息队列时触发 DAG。

然后此 DAG 确定要运行的依赖作业,并将用户和时间范围作为参数(或 xcom)传递。

2.

当数据上传到消息队列时触发 DAG。

然后此 DAG 根据用户和时间范围中的数据类型和模板动态创建作业的 DAG。

因此,您获得每个用户、作业、时间范围组合的动态 DAG。


我甚至不确定如何从消息队列触发 DAG...而且很难找到类似于此用例的示例。也许这是因为 Airflow 不适合?

任何帮助/想法/建议都将不胜感激。

谢谢您。

1个回答

3
空气流动是围绕时间表构建的。它不是为了基于数据到达而触发运行而构建的。有其他系统设计来代替这样做。我听说过像pachyderm.io或者dvs.org之类的东西。甚至可以重新利用CI工具或自定义Jenkins设置,以基于文件更改事件或消息队列触发。
然而,您可以尝试使用外部队列侦听器使用rest API调用Airflow来触发DAG。例如,如果队列是AWS SNS队列,则可以使用简单的Python编写AWS Lambda侦听器来实现这一点。
我建议每个作业类型(或用户,取决于哪个较少)一个DAG,由触发逻辑根据队列确定正确的DAG。如果有共同的清理任务之类的任务,则DAG可能会使用TriggerDagRunOperator来启动这些任务,或者您可能只需拥有每个DAG都包含的通用库中的这些清理任务。我认为后者更加干净。
DAG可以将其任务限制在特定的池中。您可以为每个用户创建一个池,以限制每个用户的作业运行次数。或者,如果您为每个用户设置了一个DAG,则可以将该DAG的最大并发DAG运行设置为合理值。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接