我正在尝试安装一个三节点的Airflow集群,每个节点都有Airflow调度器、Airflow工作器和Airflow Web服务器,还有Celery、RabbitMQ集群和Postgres多主集群(使用Bucardo实现)。软件版本如下:
- Airflow 2.0.1
- PostgreSQL 13.2
- Ubuntu 20.04
- Python 3.8.5
- Celery 4.4.7
- Bucardo 5.6.0
- RabbitMQ 3.8.2
然后我遇到了启动Airflow调度器时的问题。
当我启动第一个调度器(数据库为空)时,它成功启动。但是,当我在另一台机器上启动另一个调度器时(我也试过在同一台机器上启动),它会失败并显示以下消息:
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "job_pkey"
DETAIL: Key (id)=(25) already exists.
[SQL: INSERT INTO job (dag_id, state, job_type, start_date, end_date, latest_heartbeat, executor_class, hostname, unixname) VALUES (%(dag_id)s, %(state)s, %(job_type)s, %(start_date)s, %(end_date)s, %(latest_heartbeat)s, %(executor_class)s, %(hostname)s, %(unixname)s) RETURNING job.id]
[parameters: {'dag_id': None, 'state': 'running', 'job_type': 'SchedulerJob', 'start_date': datetime.datetime(2021, 4, 21, 7, 39, 20, 429478, tzinfo=Timezone('UTC')), 'end_date': None, 'latest_heartbeat': datetime.datetime(2021, 4, 21, 7, 39, 20, 429504, tzinfo=Timezone('UTC')), 'executor_class': 'CeleryExecutor', 'hostname': 'hostname', 'unixname': 'root'}]
(Background on this error at: http://sqlalche.me/e/13/gkpj)
尝试启动几次之后,调度程序最终开始工作。我假设ID已经增加,然后数据成功添加到数据库中:
airflow=> select * from job order by state;
id | dag_id | state | job_type | start_date | end_date | latest_heartbeat | executor_class | hostname | unixname
----+--------+---------+--------------+-------------------------------+-------------------------------+-------------------------------+----------------+------------------------------+----------
26 | | running | SchedulerJob | 2021-04-21 07:39:22.243721+00 | | 2021-04-21 07:39:22.243734+00 | CeleryExecutor | machine name | root
25 | | running | SchedulerJob | 2021-04-21 07:39:14.515009+00 | | 2021-04-21 07:39:19.632811+00 | CeleryExecutor | machine name | root
如果第二个及后续调度程序成功启动,日志表也会出现警告:
WARNING - Failed to log action with (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "log_pkey"
DETAIL: Key (id)=(40) already exists.
我理解为什么调度程序无法将数据插入表中,但应该如何正确地工作呢?如何启动多个调度程序?官方文档没有提到需要进一步配置。希望我的解释非常清楚。谢谢!
use_row_level_locking
配置参数的值是多少? - Iñigo Gonzálezuse_row_level_locking = True
。 - Denispostgres://user@localhost/database
?还是像这样postgres://user@host1,host2,host3/database
? - Iñigo González