Celery工作者数据库连接池

59

我使用独立的Celery(不在Django中)。我计划在多台物理机器上运行一个工作任务类型。该任务执行以下操作:

  1. 接受XML文档。
  2. 进行转换。
  3. 进行多个数据库读取和写入。

我正在使用PostgreSQL,但同样适用于使用连接的其他存储类型。过去,我使用过数据库连接池来避免在每个请求上创建新的数据库连接或避免保持连接太长时间。然而,由于每个Celery worker运行在单独的进程中,我不确定它们实际上如何能够共享池。我错过了什么吗?我知道Celery允许您将返回的结果保留在Celery worker中,但我这里不是要做这件事。每个任务可以根据处理的数据执行多个不同的更新或插入。

从Celery worker内部访问数据库的正确方法是什么?

是否可能在多个workers/tasks之间共享池或者有其他方式可以做到这一点?


1
你解决了吗?我对解决方案很感兴趣。 - kev
1
我选择每个工作进程使用一个数据库连接。 - oneself
@oneself 如果您接受一个答案的话会很好。 - ThatAintWorking
1
你好,你是如何为每个工作进程获取一个数据库连接的呢?我对这个解决方案很感兴趣。 - Venkat Kotra
6个回答

44

我喜欢 tigeronk2 的每个 worker 一条连接的想法。正如他所说,Celery 维护自己的 worker 池,因此并不需要单独的数据库连接池。在 Celery Signal 文档 中有解释如何在创建 worker 时进行自定义初始化,所以我在我的 tasks.py 中添加了以下代码,它似乎像你期望的那样工作。我甚至能够在关闭 worker 时关闭连接:

from celery.signals import worker_process_init, worker_process_shutdown

db_conn = None

@worker_process_init.connect
def init_worker(**kwargs):
    global db_conn
    print('Initializing database connection for worker.')
    db_conn = db.connect(DB_CONNECT_STRING)


@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
    global db_conn
    if db_conn:
        print('Closing database connectionn for worker.')
        db_conn.close()

1
我有点困惑。这不是在模块级别上创建连接,因此对于每个celery worker都会创建一个连接吗?如果您从任务内部访问全局的db_conn,则会在多个进程或线程中重复使用相同的连接(取决于并发设置)。它将启动许多连接,但只是用每个新的worker覆盖连接。我错过了什么吗? - Gijs
1
没错,这就是重点:每个工作进程一个连接。这是针对一组工作进程的解决方案,在这种情况下,全局db_conn仅在进程上下文中是全局的,因此它可以很好地运行。已经过了一段时间,所以我不记得是否费心用线程进行测试了。 - ThatAintWorking
1
@ThatAintWorking,我使用pyodbc非常顺利地完成了这个。 - Deekane
4
缺点是,如果你有固定数量的工作者,闲置的工作者的数据库连接可能会变得陈旧和无效。 - Madhur Ahuja
1
我使用了同样的方法,而不是使用全局变量,我将我的对象存储在app.conf['my_object'] = MyObject()中,然后使用任务。 - Muhammad Faizan Fareed

3

每个工作进程使用一个数据库连接。由于celery本身维护了一个工作进程池,因此您的数据库连接将始终等于celery工作者的数量。 反过来说,它将会将数据库连接池与celery工作进程管理绑定在一起。但是,鉴于GIL只允许一个线程在一个进程中运行,这应该是可以接受的。


1
“[...] 由于Celery本身维护了一组工作进程 [...] 您有文档链接吗?” - kev
啊,抱歉,我更关注于“每个工作进程一个数据库连接”的部分。但是这似乎不是文档的一部分。非常烦人,对我来说似乎是非常重要的事情。 - kev
@user1252307,你可以在文档中找到这种操作的位置。请参考我的回答。 - ThatAintWorking

2

您可以在celery配置中覆盖默认行为,使用线程化的worker而不是每个进程一个worker:

CELERYD_POOL = "celery.concurrency.threads.TaskPool"

然后您可以将共享池实例存储在任务实例中,并从每个线程化任务调用中引用它。


4
Python线程不是人们通常试图避免使用的东西吗? - oneself
取决于你想做什么。对于I/O绑定的进程,Python中的线程工作得很好。只有当你需要大量CPU时才可能遇到GAL的问题。 - ThatAintWorking

1

0

也许你可以使用pgbouncer。对于celery来说,不应该有任何变化,并且连接池是在进程外部完成的。我有同样的问题

(“也许”是因为我不确定是否会有任何副作用)


一个副作用是遇到SQLAlchemy每个进程的最大连接默认值 https://docs.sqlalchemy.org/en/14/errors.html#error-3o7r - Alan Hamlett

0

通过实现和监控来贡献我的发现。

欢迎反馈。

参考: 使用池http://www.prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html

每个工作进程(由-c k指定的预派生模式)将建立一个新的连接到DB,而不使用池或重用。 因此,如果使用池,则池仅在每个工作进程级别上可见。 因此,池大小> 1没有用,但重用连接仍然可以节省打开和关闭的连接。

如果每个工作进程使用一个连接,则在初始化阶段会为每个工作进程(prefork模式celery -A app worker -c k)建立1个DB连接。 从重复打开和关闭中节省连接。

无论有多少个工作线程(eventlet),每个工作线程(celery -A app worker -P eventlet)都只建立一个连接到DB,而不使用池或重用。 因此,对于eventlet,一个celery进程(celery -A app worker ...)上的所有工作线程(eventlet)在任何时刻都有1个db连接。

根据celery文档

但是你需要确保你的任务不执行阻塞调用,因为这会使工作线程中的所有其他操作停止,直到阻塞调用返回。这可能是由于MYSQL数据库连接方式的阻塞调用导致的。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接