我有一个DAG需要按计划运行。为确保其正常工作,我还想手动触发它。但目前两者都似乎无法工作,任务实例没有写入任何日志。唯一可用的日志是airflow调度程序日志,通常看起来很健康。
我一直看到这个消息:
“Task is not ready for retry yet but will be retried automatically. Current date is 2018-12-12T11:34:46.978355+00:00 and task will be retried at 2018-12-12T11:35:08.093313+00:00.”
然而,如果我等一会儿,完全相同的消息再次出现,只是时间稍微向前移动了一点。因此,似乎任务根本没有被重新尝试过。
我正在使用LocalExecutor,任务是SSHOperator。下面是简化的代码。它只是ssh到另一台机器并启动一堆带有预定目录结构的应用程序。
DB_INFO_FILE = 'info.json'
START_SCRIPT = '/bin/start.sh'
TIME_IN_PAST = timezone.convert_to_utc(datetime.today() -
timedelta(days=1))
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': TIME_IN_PAST,
'email': ['some_email@blah.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
def _extract_instance_id(instance_string):
return re.findall(r'\d+', instance_string)[0]
def _read_file_as_json(file_name):
with open(file_name) as open_file:
return json.load(open_file)
DB_INFO = _read_file_as_json(DB_INFO_FILE)
CONFIG_CLIENT = ConfigDbClient(**DB_INFO)
APP_DIRS = CONFIG_CLIENT.get_values('%my-app-info%')
INSTANCE_START_SCRIPT_PATHS = {
_extract_instance_id(instance_string): directory+START_SCRIPT
for instance_string, directory in APP_DIRS.items()
}
# Create an ssh hook which refers to pre-existing connection information
# setup and stored by airflow
SSH_HOOK = SSHHook(ssh_conn_id='my-conn-id')
# Create a DAG object to add tasks to
DAG = DAG('my-dag-id',
default_args=DEFAULT_ARGS)
# Create a task for each app instance.
for instance_id, start_script in INSTANCE_START_SCRIPT_PATHS.items():
task = SSHOperator(
task_id='run-script-{0}'.format(instance_id),
command='bash {0}'.format(start_script),
ssh_hook=SSH_HOOK,
dag=DAG)
当我通过命令行单独运行任务时,它可以正常工作,但是通过UI界面无法运行。似乎我可以运行任务,但是无法触发DAG运行。我尝试了许多start_date和interval计划的组合,只是为了检查一下。
有任何想法吗?
是的,我知道这个问题以前已经被问过了,我看过所有的解决方案,但没有一个能帮助我。
airflow scheduler
,包括内置的守护进程airflow scheduler -D
时,它似乎可以工作。然而,当我运行使用 airflow 推荐的单元文件设置的守护进程时,任务要么失败,要么被卡在“up_for_retry”状态中。 - shwifty chill