如何运行指定次数的Airflow DAG?
我尝试使用TriggerDagRunOperator,这个操作符对我来说很有效。在可调用函数中,我们可以检查状态并决定是否继续。
但是,当前计数和状态需要得到维护。
使用上述方法,我能够重复运行DAG。
需要专家意见,有没有其他更深入的方法来运行X次Airflow DAG? 谢谢。
如何运行指定次数的Airflow DAG?
我尝试使用TriggerDagRunOperator,这个操作符对我来说很有效。在可调用函数中,我们可以检查状态并决定是否继续。
但是,当前计数和状态需要得到维护。
使用上述方法,我能够重复运行DAG。
需要专家意见,有没有其他更深入的方法来运行X次Airflow DAG? 谢谢。
None
,然后使用API触发运行,但这是在外部进行的,因此需要维护计数和状态以确定何时以及为什么在外部触发。start_date
设置为11天前的固定日期(不要动态设置),将end_date
设置为今天(也是固定的),然后添加每日schedule_interval
和max_active_runs
为1,这样你就可以得到确切的10次运行,并且它们会连续运行而不重叠,同时相应地更改execution_date
,然后停止。或者你可以使用airflow backfill
来执行一段时间范围内的任务,使用None
作为调度DAG。0 0/2 * * *
),并将max_active_runs
设置为1,这样如果先前的运行尚未完成,则下一个运行将等待,然后在先前的运行完成后启动。请参见https://airflow.apache.org/faq.html#why-isn-t-my-task-getting-scheduled中的最后一个项目。depends_on_past
,这些任务本身不应同时运行(例如创建、插入或删除临时表),或者使用具有单个插槽的池。
如果下一个计划已经准备好启动,没有任何功能可以杀死先前的运行。如果先前的运行尚未完成,则可能跳过当前运行,但我忘记了具体如何操作。
这基本上是你的大部分选择。此外,您可以为未安排的DAG创建手动dag_run
;当您感觉需要时,每次创建10个(使用UI或CLI而不是API,但API可能更容易)。
这些建议中的任何一个是否解决了您的问题?由于不清楚您想要固定数量的运行次数,频率,以及使用什么计划和条件,因此很难提供具体的建议。
BranchOperator
来结束任务。该操作符将分支到一个虚拟的END任务或者是一个TriggerDagRunOperator
,并且会减少Airflow变量或其他外部数据源(如DB、http get/put/post、S3/GCP路径中的值等)以确定分支路径。 - dlamblintask_command.py
。注意
这种方法的缺点是假定任务(TaskInstance
)的历史运行将永久保留(并且正确)
catchup
设置为False
)