Airflow未能正确调度Python代码

14

代码:

Python版本为2.7.x,Airflow版本为1.5.1。

我的DAG脚本如下:

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
    t.set_upstream(run_this_first)

从中可以看出,我正在创建一个包含6个任务的DAG,第一个任务(Start1)最先开始,之后所有其他五个任务开始。

目前我在DAG启动之间设置了5分钟的延迟时间。

在第一次成功运行六个任务后,但是在五分钟后,DAG没有重新启动。

已经超过1小时了,DAG仍未重新启动,我真的不知道哪里出了问题。

如果有人能指出我的错误,那就太好了。我尝试使用airflow testing clear清除,然后再做同样的事情。它运行了第一次实例,然后就停在那里了。

命令行显示的唯一内容是Getting all instance for DAG testing

当我改变schedule_interval的位置时,它只会并行运行而没有任何调度间隔。也就是在5分钟内完成了300个或更多任务实例。没有5分钟的调度间隔。

代码2:

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
    t.set_upstream(run_this_first)
2个回答

11

对于Code 2,我猜想它每分钟运行的原因是:

  1. 开始时间是2015-10-13 00:00

  2. 调度间隔为5分钟

  3. 每个调度器的心跳(默认为5秒),都会检查您的DAG

    • 第一次检查:起始日期(未找到上次执行日期)+ 调度间隔 < 当前时间?如果是,则将执行DAG并记录最后执行时间。(例如:2015-10-13 00:00 + 5min < current?)
    • 下一个心跳进行第二次检查:最后执行时间+调度间隔<当前时间?如果是,则再次执行DAG。
    • ....

解决方法是将DAG start_date设置为datetime.now() - schedule_interval

如果你想要调试:

  1. 在settings.py中将LOGGINGLEVEL设置为debug

  2. 修改airflow.models.TaskInstance的类方法is_queueable()

:

def is_queueable(self, flag_upstream_failed=False):
    logging.debug('Checking whether task instance is queueable or not!')
    if self.execution_date > datetime.now() - self.task.schedule_interval:
        logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now()))
        return False
        ...

所以你的意思是它会每五秒运行一次,直到执行日期达到当前日期时间,之后将遵循预定的时间间隔。 - The6thSense
是的,那就是我的意思。 - Yongyiw
非常感谢,伙计,但我有两个疑问。我如何安排一个任务,从此刻开始,每隔一小时执行一次?我能否安排一个未来的工作? - The6thSense

4

由于启动时间(2015-10-13 00:00)早于现在的时间,因此触发了airflowbackfill。它将从2015-10-13 00:00开始运行,当每秒钟airflow调度程序检测到时(即为“开始日期”),但执行日期在5分钟(任务间隔时间)之间。

请查看日志名称:

$tree airflow/logs/testing/
testing/
|-- Orders10
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders11
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders12
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders13
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders14
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
-- Start1
    |-- 2015-10-13T00:00:00
    |-- 2015-10-13T00:05:00
    |-- 2015-10-13T00:10:00
    -- 2015-10-13T00:15:00

查看日志创建时间:

$ll airflow/logs/testing/Start1
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:50 2015-10-13T00:00:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:50 2015-10-13T00:05:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:51 2015-10-13T00:10:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:52 2015-10-13T00:15:00

此外,您可以在Web界面上查看任务实例:

air flow Task Instances


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接