Apache Airflow调度器未能调度作业

5

我正在使用 apache airflow 1.8.0

当我执行backfill任务时,这里是输出结果。

[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00     [scheduled]>
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>
[2017-04-13 09:42:55,864] {models.py:1120} INFO - Dependencies not met for <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 3 non-success(es). upstream_tasks_state={'skipped': Decimal('0'), 'successes': Decimal('0'), 'done': 0, 'upstream_failed': Decimal('0'), 'failed': Decimal('0')}, upstream_task_ids=['runme_0', 'runme_1', 'runme_2']

当我尝试调度任何DAG时,它会抛出错误。

Traceback (most recent call last):
  File "/anaconda3/bin/airflow", line 28, in <module>
    args.func(args)
  File "/anaconda3/lib/python3.5/site-packages/airflow/bin/cli.py", line 167, in backfill
    pool=args.pool)
  File "/anaconda3/lib/python3.5/site-packages/airflow/models.py", line 3330, in run
    job.run()
  File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 200, in run
    self._execute()
  File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 2021, in _execute
    raise AirflowException(err)
airflow.exceptions.AirflowException: ---------------------------------------------------

这里是关于任务的输出。
BackfillJob is deadlocked. These tasks have succeeded:
set()
 These tasks have started:
{}
 These tasks have failed:
set()
 These tasks are skipped:
set()
 These tasks are deadlocked:
{<TaskInstance: example_bash_operator.runme_0 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:44:00 [scheduled]>}

已测试通过 python 2.7python 3.5

使用了 SequentialExecutorLocalExecutor

PS. 如果我在当前时间回填DAG,则它将执行一次,然后对所有已安排的任务抛出上述错误。


1
你也遇到了Airflow 1.9的类似问题吗?有什么解决方法吗? - mad_
1
@mad_ 为了正确安排作业,需要考虑以下几点: 1)确保dag具有恒定的开始时间,而不是像datetime.utcnow()这样的东西。 2)调度间隔没有设置为@None。 3)确保之前没有对该dag进行过backfill。如果您要回填dag,则需要首先通过airflow clear <dag_id>清除该dag。 如果有帮助,请告诉我。 - Mubin
如果我不将日期参数设置为now()或datetime.datetime().now(),那么我如何在程序中进行调度?我需要手动传递日期吗?因为在1.7版本中并非如此,或者我可能遗漏了什么? - mad_
我正在尝试在函数内部的本地作用域中运行dag。 - mad_
全局空间中的DAG才会被检测到。 - Mubin
指定开始时间如此=>datetime(2018,4,5,5,0) - Mubin
1个回答

5

您的Airflow实例处于死锁状态。失败的任务不允许任务的未来运行。

Airflow会将每个DAG运行中的任务作为新进程进行启动,如果任务出现故障并且没有处理,就会出现死锁情况。

要解决这种情况,您可以执行以下操作之一:

  1. 使用airflow clear <<dag_id>>,这将解决死锁问题,并允许DAG/任务的未来运行。
  2. 如果上述方法无法解决问题,则需要使用airflow resetdb,这将清除Airflow数据库,从而解决问题。

在今后的使用中,建议您:

  • 尝试使用execution_timeout=timedelta(minutes=2)设置一些超时时间,以便对操作者进行明确控制。
  • 还应该提供一个on_failure_callback=handle_failure,以便在出现故障时清晰地退出操作者。

希望这能帮到您,祝好!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接