代码:
Python版本为2.7.x,Airflow版本为1.5.1。
我的DAG脚本如下:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
从中可以看出,我正在创建一个包含6个任务的DAG,第一个任务(Start1)最先开始,之后所有其他五个任务开始。
目前我在DAG启动之间设置了5分钟的延迟时间。
在第一次成功运行六个任务后,但是在五分钟后,DAG没有重新启动。
已经超过1小时了,DAG仍未重新启动,我真的不知道哪里出了问题。
如果有人能指出我的错误,那就太好了。我尝试使用airflow testing clear
清除,然后再做同样的事情。它运行了第一次实例,然后就停在那里了。
命令行显示的唯一内容是Getting all instance for DAG testing
。
当我改变schedule_interval的位置时,它只会并行运行而没有任何调度间隔。也就是在5分钟内完成了300个或更多任务实例。没有5分钟的调度间隔。
代码2:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)