Apache Airflow 任务卡在“up_for_retry”状态

7
我正在我们的系统上设置一个airflow集群,之前它一直在工作。我不确定我做了什么来改变这个情况。
我有一个DAG需要按计划运行。为确保其正常工作,我还想手动触发它。但目前两者都似乎无法工作,任务实例没有写入任何日志。唯一可用的日志是airflow调度程序日志,通常看起来很健康。
我一直看到这个消息:
“Task is not ready for retry yet but will be retried automatically. Current date is 2018-12-12T11:34:46.978355+00:00 and task will be retried at 2018-12-12T11:35:08.093313+00:00.”
然而,如果我等一会儿,完全相同的消息再次出现,只是时间稍微向前移动了一点。因此,似乎任务根本没有被重新尝试过。
我正在使用LocalExecutor,任务是SSHOperator。下面是简化的代码。它只是ssh到另一台机器并启动一堆带有预定目录结构的应用程序。
DB_INFO_FILE = 'info.json'
START_SCRIPT = '/bin/start.sh'
TIME_IN_PAST = timezone.convert_to_utc(datetime.today() - 
timedelta(days=1))

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': TIME_IN_PAST,
    'email': ['some_email@blah.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

def _extract_instance_id(instance_string):
    return re.findall(r'\d+', instance_string)[0]

def _read_file_as_json(file_name):
    with open(file_name) as open_file:
         return json.load(open_file)

DB_INFO = _read_file_as_json(DB_INFO_FILE)
CONFIG_CLIENT = ConfigDbClient(**DB_INFO)

APP_DIRS = CONFIG_CLIENT.get_values('%my-app-info%')

INSTANCE_START_SCRIPT_PATHS = {
    _extract_instance_id(instance_string): directory+START_SCRIPT
    for instance_string, directory in APP_DIRS.items()
    }

# Create an ssh hook which refers to pre-existing connection information
# setup and stored by airflow
SSH_HOOK = SSHHook(ssh_conn_id='my-conn-id')

# Create a DAG object to add tasks to
DAG = DAG('my-dag-id',
          default_args=DEFAULT_ARGS)

# Create a task for each app instance.
for instance_id, start_script in INSTANCE_START_SCRIPT_PATHS.items():
    task = SSHOperator(
        task_id='run-script-{0}'.format(instance_id),
        command='bash {0}'.format(start_script),
        ssh_hook=SSH_HOOK,
        dag=DAG)

当我通过命令行单独运行任务时,它可以正常工作,但是通过UI界面无法运行。似乎我可以运行任务,但是无法触发DAG运行。我尝试了许多start_date和interval计划的组合,只是为了检查一下。

有任何想法吗?

是的,我知道这个问题以前已经被问过了,我看过所有的解决方案,但没有一个能帮助我。


我似乎找到了原因。当我运行 airflow scheduler,包括内置的守护进程 airflow scheduler -D 时,它似乎可以工作。然而,当我运行使用 airflow 推荐的单元文件设置的守护进程时,任务要么失败,要么被卡在“up_for_retry”状态中。 - shwifty chill
2个回答

6

哦,你的start_date正在以与计划间隔周期相同或更快的速度变化。

以下是调度程序每隔几秒钟看到的内容:

start_date: 2018-12-11T12:12:12.000Z  # E.G. IFF now is 2018-12-12T12:12:12.000Z, a day ago
schedule_interval: timedelta(days=1)  # The default

这是调度器运行DAG所需的条件:上次运行发生在一个以上的调度间隔之前。如果没有计划运行,第一次计划运行可以从现在开始,如果自start_date以来已经过去了一个完整的调度间隔,因为这是execution_date的最早可允许日期。在这种情况下,应创建dag_run,并将execution_date设置为该时间段开头的时间。然后,对于DAG中任何依赖项已满足的任务,可以创建task_instance,只要task_instance execution_date在DAG的start_date之后(这不存储在dag_run对象中,而是通过加载DAG文件进行重新计算,仅在检查DAG状态时)。

因此,它无法自动调度的原因是开始日期保持改变,就像间隔被满足一样。但是,如果它是-2d,至少会安排一个运行,然后任何进一步的运行都必须等待1d才能安排。不过,如果您只需在start_date上设置固定的datetime,那么更容易。

但是你手动运行时遇到了奇怪的重试问题...

您确实启动了一两个手动运行。这些运行将当前时间作为execution_date,除非您指定其他内容。这应该在start_date之后,至少要等到明天才能清除它们以运行。但是,似乎在日志中看到它们失败并被标记为重试,并且也没有减少重试次数。我不确定为什么会这样,但可能是SSHOperator出了问题。

您是否使用[ssh]额外安装了Airflow,以便在Web服务器和调度程序上满足SSHOperator的依赖关系(特别是paramikosshtunnel)?其中一个正在工作,因为我假设它已被添加到数据库并解析并显示在UI中。

如果执行以下操作,您会得到什么:

airflow test my-dag-id run-script-an_instance_id 2018-12-12T12:12:12

你知道调度器和 Web 服务器正在循环填充它们的 DAG 包,并且每天重新运行该 DAG 文件几千次,重新加载那个 JSON(它是本地访问,类似于导入模块),并使用 DB 查找重新创建那个 SSHHook。我没有看到设置此钩子时做任何花哨的事情,为什么不从 SSHOperator 中删除 ssh_hook 并将其替换为 ssh_conn_id='my-conn-id',以便可以在执行时创建一次?虽然我怀疑这不是导致重试滚动的问题。

2

在我注意到之前,我的一个任务被卡在up_for_retry状态中将近24小时,而这与start_date、end_date或其他初学者经常遇到的问题无关。

最终,我阅读了源代码,并发现如果它们是回填DAG运行的一部分,Airflow会以不同的方式处理up_for_retry任务。

因此,我连接到元数据数据库,并在对应于卡住的任务的dag_run行中将backfill_更改为scheduled__。Airflow立即开始运行卡住的任务。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接