Airflow DAG的任务超时时间

7

我在我的airflow DAG中运行了5个PythonOperator任务,其中一个正在执行需要很长时间的ETL作业,这导致所有资源都被阻塞。有没有办法为每个任务设置最大执行时间,在此时间之后,任务要么失败要么标记为成功(以便DAG不会失败)并显示消息?

3个回答

12

在每个运算符中,我们都有一个execution_timeout变量,您需要传递一个datetime.timedelta对象。

根据基本操作员代码注释

:param execution_timeout: max time allowed for the execution of
    this task instance, if it goes beyond it will raise and fail.
:type execution_timeout: datetime.timedelta

请记住,这将导致DAG的单次运行失败并触发重新运行,并且只有所有重新运行都失败时才会声明为DAG失败。

因此,根据您分配的自动重试次数,如果代码一直需要太长时间,则可能需要最大潜在时间 ( 重试次数 ) x ( 超时时间 )


1

0
此文档中,您需要设置execution_timeout任务参数,它应该类似于这样。
from datetime import timedelta

sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(hours=2),
    timeout=3600,
    retries=2,
    mode="reschedule",
)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接