如何运行Airflow DAG指定次数?

3

如何运行指定次数的Airflow DAG?

我尝试使用TriggerDagRunOperator,这个操作符对我来说很有效。在可调用函数中,我们可以检查状态并决定是否继续。

但是,当前计数和状态需要得到维护。

使用上述方法,我能够重复运行DAG。

需要专家意见,有没有其他更深入的方法来运行X次Airflow DAG? 谢谢。


尝试解释你希望实现的目标,即作业需要运行特定次数。 - tobi6
假设单个DAG有5个任务。我想运行这个DAG 10次。假设单次运行需要2小时或更长时间。简单地说,我不能基于时间安排计划。因此,希望根据我在DAG配置中指定的数量来运行DAG。 - Omkara
2个回答

4
我很抱歉,Airflow完全是基于时间的调度。您可以将计划设置为None,然后使用API触发运行,但这是在外部进行的,因此需要维护计数和状态以确定何时以及为什么在外部触发。
当您说您的DAG可能有5个任务需要运行10次,每次运行需要2小时,并且您无法根据时间表安排它时,这很令人困惑。我们不知道2小时对您有何意义,也不知道为什么必须运行10次,也不知道为什么无法将其安排为每天运行这5个任务一次。通过简单的每日计划,它将在大约相同的时间每天运行一次,而且在任何给定的一天花费比2小时多一点都没有关系。对吗?
你可以将start_date设置为11天前的固定日期(不要动态设置),将end_date设置为今天(也是固定的),然后添加每日schedule_intervalmax_active_runs为1,这样你就可以得到确切的10次运行,并且它们会连续运行而不重叠,同时相应地更改execution_date,然后停止。或者你可以使用airflow backfill来执行一段时间范围内的任务,使用None作为调度DAG。
你的意思是希望它每两个小时连续运行,但有时会运行更长时间,你不希望它重叠运行?那么,你可以将其安排在每两个小时运行一次(0 0/2 * * *),并将max_active_runs设置为1,这样如果先前的运行尚未完成,则下一个运行将等待,然后在先前的运行完成后启动。请参见https://airflow.apache.org/faq.html#why-isn-t-my-task-getting-scheduled中的最后一个项目。
如果你希望你的DAG恰好每两个小时准时运行[考虑到一些调度程序滞后,是的,这是一个问题]并保留先前的运行,则大多数情况下这是默认行为,但你可以向一些重要任务添加depends_on_past,这些任务本身不应同时运行(例如创建、插入或删除临时表),或者使用具有单个插槽的池。

如果下一个计划已经准备好启动,没有任何功能可以杀死先前的运行。如果先前的运行尚未完成,则可能跳过当前运行,但我忘记了具体如何操作。

这基本上是你的大部分选择。此外,您可以为未安排的DAG创建手动dag_run;当您感觉需要时,每次创建10个(使用UI或CLI而不是API,但API可能更容易)。

这些建议中的任何一个是否解决了您的问题?由于不清楚您想要固定数量的运行次数,频率,以及使用什么计划和条件,因此很难提供具体的建议。


非常有帮助的答案,从第四段中找到了一些线索。非常感谢。基本上,我想交叉检查是否有任何设置或配置(基于代码或json)可以使DAG按顺序一个接一个地运行。例如,仅作为类比,考虑我们想要在“for循环”中运行DAG,其中“for循环”由计数器变量控制。如果有这样的功能,则我将不会担心执行单个迭代所需的时间。因此,用户可以手动运行并通过Airflow变量外部控制迭代计数。 - Omkara
从你的评论中可以看出,@Omkara,你可能想尝试在DAG中使用BranchOperator来结束任务。该操作符将分支到一个虚拟的END任务或者是一个TriggerDagRunOperator,并且会减少Airflow变量或其他外部数据源(如DB、http get/put/post、S3/GCP路径中的值等)以确定分支路径。 - dlamblin
是的,正如我之前在帖子中提到的那样,这是我作为第一次尝试所做的。然而,看起来有点笨拙。无论如何。 - Omkara

1
  • 这个功能Airflow本身不支持
  • 但是通过利用元数据库,我们可以自己开发出这个功能

我们可以编写定制操作符/Python操作符。
在运行实际计算之前,请检查元数据库中的任务(TaskInstance表)是否已运行。如果存在,则跳过该任务(使用AirflowSkipException,参考reference)。有关帮助,请参阅task_command.py
这篇优秀的文章可以用作灵感来源:使用Apache Airflow精确运行任务

注意

这种方法的缺点是假定任务(TaskInstance)的历史运行将永久保留(并且正确)

  • 然而,在实践中,我经常发现任务实例(task_instances)会丢失(我们将catchup设置为False)
  • 此外,在大型Airflow部署中,可能需要定期清理元数据库(meta-db),这会使这种方法变得不可行。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接