我们的空气流安装使用CeleryExecutor。 并发配置如下:
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 16
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
# Are DAGs paused by default at creation
dags_are_paused_at_creation = True
# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 64
# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16
[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor
# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16
我们有一个每天执行的dag。它按照一种模式以并行方式运行大约一些任务,该模式检测数据是否存在于hdfs中,然后休眠10分钟,并最终上传到s3。
其中一些任务遇到了以下错误:
2019-05-12 00:00:46,212 ERROR - Executor reports task instance <TaskInstance: example_dag.task1 2019-05-11 04:00:00+00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
2019-05-12 00:00:46,558 INFO - Marking task as UP_FOR_RETRY
2019-05-12 00:00:46,561 WARNING - section/key [smtp/smtp_user] not found in config
在这些任务中,这种错误会随机发生。当出现此错误时,任务实例的状态会立即设置为up_for_retry,并且工作节点上没有日志。经过一些重试,它们最终会执行并完成。
这个问题有时会导致大量ETL延迟。有人知道如何解决这个问题吗?