多个Airflow调度器

3

我正在尝试安装一个三节点的Airflow集群,每个节点都有Airflow调度器、Airflow工作器和Airflow Web服务器,还有Celery、RabbitMQ集群和Postgres多主集群(使用Bucardo实现)。软件版本如下:

  • Airflow 2.0.1
  • PostgreSQL 13.2
  • Ubuntu 20.04
  • Python 3.8.5
  • Celery 4.4.7
  • Bucardo 5.6.0
  • RabbitMQ 3.8.2

然后我遇到了启动Airflow调度器时的问题。

当我启动第一个调度器(数据库为空)时,它成功启动。但是,当我在另一台机器上启动另一个调度器时(我也试过在同一台机器上启动),它会失败并显示以下消息:

sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "job_pkey"
DETAIL:  Key (id)=(25) already exists.

[SQL: INSERT INTO job (dag_id, state, job_type, start_date, end_date, latest_heartbeat, executor_class, hostname, unixname) VALUES (%(dag_id)s, %(state)s, %(job_type)s, %(start_date)s, %(end_date)s, %(latest_heartbeat)s, %(executor_class)s, %(hostname)s, %(unixname)s) RETURNING job.id]
[parameters: {'dag_id': None, 'state': 'running', 'job_type': 'SchedulerJob', 'start_date': datetime.datetime(2021, 4, 21, 7, 39, 20, 429478, tzinfo=Timezone('UTC')), 'end_date': None, 'latest_heartbeat': datetime.datetime(2021, 4, 21, 7, 39, 20, 429504, tzinfo=Timezone('UTC')), 'executor_class': 'CeleryExecutor', 'hostname': 'hostname', 'unixname': 'root'}]
(Background on this error at: http://sqlalche.me/e/13/gkpj)


尝试启动几次之后,调度程序最终开始工作。我假设ID已经增加,然后数据成功添加到数据库中:
airflow=> select * from job order by state;
 id | dag_id |  state  |   job_type   |          start_date           |           end_date            |       latest_heartbeat        | executor_class |           hostname           | unixname 
----+--------+---------+--------------+-------------------------------+-------------------------------+-------------------------------+----------------+------------------------------+----------
 26 |        | running | SchedulerJob | 2021-04-21 07:39:22.243721+00 |                               | 2021-04-21 07:39:22.243734+00 | CeleryExecutor |                machine name  | root
 25 |        | running | SchedulerJob | 2021-04-21 07:39:14.515009+00 |                               | 2021-04-21 07:39:19.632811+00 | CeleryExecutor |                machine name  | root 

如果第二个及后续调度程序成功启动,日志表也会出现警告:

WARNING - Failed to log action with (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "log_pkey"
DETAIL:  Key (id)=(40) already exists.

我理解为什么调度程序无法将数据插入表中,但应该如何正确地工作呢?如何启动多个调度程序?官方文档没有提到需要进一步配置。希望我的解释非常清楚。谢谢!


请问您正在使用哪个Airflow版本,并且在所有服务器上use_row_level_locking配置参数的值是多少? - Iñigo González
是的,Airflow版本为2.0.1,在所有服务器上使用use_row_level_locking = True - Denis
你的连接字符串是这样的吗 postgres://user@localhost/database?还是像这样 postgres://user@host1,host2,host3/database - Iñigo González
我正在使用第一个连接字符串,因为我有Postgresql带有主-主复制并位于同一台机器上。 - Denis
2个回答

0

0

看起来Airflow调度程序和Bucardo之间存在竞争条件。

可能最简单的解决方法是使用类似于以下连接字符串在airflow.cfg中按顺序查询所有服务器(所有节点上都相同):

[core]
sql_alchemy_conn=postgresql://USER:PASS@/DB?host=node1:port1&host=node2B&host=node3

为了使其正常工作,您需要 sqlalchemy >= 1.3

为什么会发生这种情况

您的调度程序和 bucardo 之间存在竞争条件,尝试从不同的主机读取和写入表中的数据。更改不会像应该的那样快速传播,服务器对表的写入将失败。

即使您将所有节点视为“多主”,使所有节点首先查看同一服务器也可以解决此问题。在出现故障的情况下,它们将使用第二个节点。


谢谢回复!但我不这么认为。让我解释一下。我正在启动第一个调度程序,然后等待它启动,然后尝试启动另一个调度程序。 - Denis

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接