Airflow调度器在任务量大时无法调度(或调度缓慢)

3

我正在使用Google Cloud Composer上的Airflow(版本:composer-1.10.2-airflow-1.10.6)。

我发现当任务过多时调度程序不会安排任务(请参阅下面的甘特图)

Gantt View (不要注意颜色,红色任务是“createTable Operators”,如果表已经存在,则会失败5次,然后才运行DAG的下一部分(重要的一部分))

有几个小时的间隔在任务之间!(例如,早上10点到下午3点之间有5个小时没有任何任务执行)

通常情况下,每个DAG大约有100-200个任务,使用40个左右的DAG正常工作(有时会更多)。但最近我添加了2个有很多任务(每个约5000个)的DAG,并且调度程序非常慢或不安排任务。在屏幕截图中,我于下午3点暂停了这2个带有大量任务的DAG,然后调度程序又恢复正常工作了。

您有任何解决方案吗?

Airflow旨在处理“无限量”的任务。

以下是我的环境信息:

  • 版本:composer-1.10.2-airflow-1.10.6
  • 集群规模:6(12个vCPU,96GB内存)

以下是Airflow配置的一些信息:

╔════════════════════════════════╦═══════╗
║ Airflow parameter              ║ value ║
╠════════════════════════════════╬═══════╣
║ -(celery)-                     ║       ║
║ worker_concurrency             ║ 32    ║
║ -(webserver)-                  ║       ║
║ default_dag_run_display_number ║ 2     ║
║ workers                        ║ 2     ║
║ worker_refresh_interval        ║ 60    ║
║ -(core)-                       ║       ║
║ max_active_runs_per_dag        ║ 1     ║
║ dagbag_import_timeout          ║ 600   ║
║ parallelism                    ║ 200   ║
║ min_file_process_interval      ║ 60    ║
║ -(scheduler)-                  ║       ║
║ processor_poll_interval        ║ 5     ║
║ max_threads                    ║ 2     ║
╚════════════════════════════════╩═══════╝

感谢您的帮助。

编辑:

我的26个DAG由一个.py文件创建,通过解析一个大型JSON变量创建所有DAG和任务。

也许问题就出在这里,因为今天Airflow正在调度除我描述的26个DAG(尤其是那两个大型DAG)之外的其他DAG中的任务。 更具体地说,Airflow有时会安排我的26个DAG的任务,但它更容易和更频繁地安排其他DAG的任务。


你的集群健康状况如何?你有没有注意到内存或处理方面出现了一些问题? - rmesteves
我的集群似乎很健康。我没有注意到任何奇怪的事情。 - Bibimrlt
你能检查一下你的项目的配额吗?https://cloud.google.com/composer/quotas - rmesteves
配额方面一切都很好。我认为问题不在这里,因为没有任何事情被安排了很长时间。 - Bibimrlt
2个回答

4
高的任务间延迟通常表明调度器存在瓶颈问题(与工作相关的问题不同)。即使反复运行相同的DAG,Composer 环境仍然可能出现性能瓶颈,因为每次工作分配可能会不同,或者可能有不同的进程在后台运行。
首先,建议增加可供调度程序使用的线程数(scheduler.max_threads),然后确保您的调度程序没有消耗掉其所在节点的所有 CPU。您可以通过确定调度程序所在位置,然后在 Cloud Console 中检查节点的 CPU 指标来检查调度程序所在节点的 CPU 情况。要查找节点名称:
# Obtain the Composer namespace name
kubectl get namespaces | grep composer

# Check for the scheduler
kubectl get pods -n $NAMESPACE -o wide | grep scheduler

如果以上方法没有帮助,那么调度程序有可能是故意在一个条件上阻塞。要查看调度程序在检查要运行的任务时评估的所有条件,请设置core.logging_level=DEBUG。在调度程序日志中(您可以在 Cloud Logging 中过滤),然后检查所有条件是否通过或失败以决定任务的运行或排队。


CPU使用率似乎也很好。我已经多次更改了调度程序的max_threads,但我认为这并没有起到任何作用。目前,我有4个线程,因为我的节点中有2个虚拟核心。还有一件事。我有26个DAG是由单个.py文件动态生成的。我将其分成了26个.py文件(每个文件一个DAG),调度程序似乎响应更好了。 - Bibimrlt

2
我认为你应该升级到 Composer 版本 1.10.4,拥有最新的补丁总是有帮助的。
你正在使用哪个数据库?拥有所有这些失败的任务是极不明智的。你可以使用CREATE TABLE IF NOT EXISTS ...吗?

升级没问题,这总是更好的选择。对于“CREATE TABLE”,我理解你的观点,我肯定会进行更改(但这不是问题所在);我正在使用Airflow中的本地BigQueryCreateEmptyTableOperator,它负责所有这些失败。 - Bibimrlt
如果我理解正确,每个创建表的任务都会运行5次,这非常浪费资源。我知道这不是这种情况的确切问题,但是删除每个表的这4个任务将导致任务数量大大减少。由于您正在处理多个DAG,所有任务同时运行,因此即使在此之后可能还需要另一步操作,这也会有所帮助。我建议您将retries=0添加到这些任务中,或者通过编写完整的CREATE TABLE IF NOT EXISTS ...命令来重新创建任务,虽然这很耗时,但可能是值得的。 - parakeet
1
我同意这不是我们讨论的主题,但是试图在不解决多余任务的情况下解决问题也不是最好的方法。而且,你说得对,BigQueryCreateEmptyTableOperator应该能够本地处理这个问题! - parakeet
根据您的建议,我更改了那些令人烦恼的失败任务!这确实更好。 - Bibimrlt
不,CreateTable任务不再无缘无故地失败了,所以速度更快了。 对于调度程序问题,我将线程增加到max_threads=4,并将之前一个单独的.py文件生成的26个DAG拆分成了26个.py文件。现在调度程序的工作明显更好了。 - Bibimrlt
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接