Apache Airflow:执行器报告任务实例已完成(失败),尽管任务状态显示为排队。

20

我们的空气流安装使用CeleryExecutor。 并发配置如下:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 16

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 64

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16
[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16

我们有一个每天执行的dag。它按照一种模式以并行方式运行大约一些任务,该模式检测数据是否存在于hdfs中,然后休眠10分钟,并最终上传到s3。

其中一些任务遇到了以下错误:

2019-05-12 00:00:46,212 ERROR - Executor reports task instance <TaskInstance: example_dag.task1 2019-05-11 04:00:00+00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
2019-05-12 00:00:46,558 INFO - Marking task as UP_FOR_RETRY
2019-05-12 00:00:46,561 WARNING - section/key [smtp/smtp_user] not found in config

在这些任务中,这种错误会随机发生。当出现此错误时,任务实例的状态会立即设置为up_for_retry,并且工作节点上没有日志。经过一些重试,它们最终会执行并完成。

这个问题有时会导致大量ETL延迟。有人知道如何解决这个问题吗?


1
我也遇到了同样的问题。这是一个简单的DAG。你是怎么解决的? - alltej
@alltej 我在下面提供了我的答案 - GodBlessYou
5个回答

8

1
我们没有使用回填命令。问题出在我们的airflow ETL中。 - GodBlessYou
1
为什么您认为这解决了问题?我想更好地了解这个问题,因为我们也遇到了同样的问题。 - bosnjak

4
我在我的DagRuns中看到非常相似的症状。我认为这是由于ExternalTaskSensor和并发问题所致,因为队列和被杀任务的语言看起来像这样:Executor reports task instance <TaskInstance: dag1.data_table_temp_redshift_load 2019-05-20 08:00:00+00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally? 但是当我查看工作人员日志时,我发现在我的DAG中使用Variable.set设置变量引起了错误。此问题在此处duplicate key value violates unique constraint when adding path variable in airflow dag中有所描述,其中调度程序定期轮询dagbag以动态刷新任何更改。每次心跳时的错误导致了显着的ETL延迟。
您是否在您的wh_hdfs_to_s3 DAG(或其他DAG)中执行任何可能导致错误或延迟/这些症状的逻辑?

我也遇到了这个问题。有什么解决方法吗?我尝试使用不同的dag_id和task_id创建DAG,希望它会被某个dag_id或task_id卡住,但我仍然看到不同的dag_id或task_id出现了问题。 - alltej
检查工作日志是解决问题的关键,即使任务被报告为成功,我在那里发现了一个错误:{taskinstance.py: 845} INFO-依赖项未满足...依赖项“执行日期”失败:执行日期2021-06-16T23:29:31.114396+00:00在未来(当前日期为2021-06-16T23:27:06.958673+00:00) - NicoE

3

我们已经解决了这个问题。让我自问自答:

我们有5个airflow工作节点。在安装flower来监视分布到这些节点的任务后,我们发现失败的任务总是发送到特定的节点。我们尝试使用airflow test命令在其他节点上运行该任务,并且它们都可以正常工作。最终,原因是该特定节点中存在错误的Python包。


1
我遇到了类似的问题。你能指出是哪个包引起的吗? - Jiafan Zhang
1
@JiafanZhang 我忘记是哪个包了。实际上这取决于你的代码。如果你的代码使用了一个类,但它所依赖的包没有在其中一个工作进程中进行 'pip install',那么就会出现异常。 - GodBlessYou
1
我曾经遇到过同样的问题,但是上述解决方案解决了它。 在我的情况下,任务(让我们称之为task1)在一个工作节点(让我们称之为worker1)中失败。除了worker1外,所有工作节点都有task1日志可供查看。 这表明worker1出了一些问题。 我通过运行“airflow test dag_id task_id execution_date”来重新验证它,并且它抛出了错误消息。 - Ganesh
我的一名工作人员的硬盘已经满了。Airflow 工作正常,任务正在其他工作人员上运行,但是那些分配给有问题节点的任务会因为以上错误而失败。解决所有工作人员的问题,这样就不会再出现这种情况了。 - babis21

1
在MWAA(AWS托管的Airflow)中,我们通过在MWAA环境中添加一个Airflow配置选项来解决了这个问题,其中配置选项"celery.worker_autoscale"被设置为"5,5"(请注意,我们有一个mw1.medium,最大并发任务数为10,我们的Airflow版本是2.2.2,我们大约有20多个并行运行的"作业",每个作业有6个任务)。我们在this git issue中找到了解决方法。
配置项"celery.worker_autoscale"是启动worker时将使用的最大和最小并发数。我认为问题的实质是,Celery在一个worker上运行了最大数量的进程(传感器、任务等),而另一个worker上运行了0个进程,此时它会发送错误消息"Executor reports task instance finished (failed) although the task says its queued."我认为这样更好地分配了Celery必须在其worker上运行的进程,这有助于记录每个进程的状态。Airflow使用这些日志来识别任务之间的状态转换。
更多关于Airflow和MWAA配置的信息,请参阅这篇中等文章。

0
我遇到了同样的问题,并且通过在配置JSON(可选)部分中使用{ "donot_pickle": "True" } JSON键值对来解决了它。

enter image description here


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接