我在我的airflow DAG中运行了5个PythonOperator任务,其中一个正在执行需要很长时间的ETL作业,这导致所有资源都被阻塞。有没有办法为每个任务设置最大执行时间,在此时间之后,任务要么失败要么标记为成功(以便DAG不会失败)并显示消息?
我在我的airflow DAG中运行了5个PythonOperator任务,其中一个正在执行需要很长时间的ETL作业,这导致所有资源都被阻塞。有没有办法为每个任务设置最大执行时间,在此时间之后,任务要么失败要么标记为成功(以便DAG不会失败)并显示消息?
在每个运算符中,我们都有一个execution_timeout
变量,您需要传递一个datetime.timedelta
对象。
根据基本操作员代码注释:
:param execution_timeout: max time allowed for the execution of
this task instance, if it goes beyond it will raise and fail.
:type execution_timeout: datetime.timedelta
请记住,这将导致DAG的单次运行失败并触发重新运行,并且只有所有重新运行都失败时才会声明为DAG失败。
因此,根据您分配的自动重试次数,如果代码一直需要太长时间,则可能需要最大潜在时间 ( 重试次数 ) x ( 超时时间 )
。
execution_timeout
任务参数,它应该类似于这样。from datetime import timedelta
sensor = SFTPSensor(
task_id="sensor",
path="/root/test",
execution_timeout=timedelta(hours=2),
timeout=3600,
retries=2,
mode="reschedule",
)