Celery工作者中的事件循环已关闭。

5
我在使用异步ODM时遇到了多个问题,主要是在Celery工作器内。一开始我无法使用Celery工作器信号来初始化我的数据库模型,我正在使用Beanie进行数据库连接。 第一个实现方式
from asyncer import syncify
from asgiref.sync import async_to_sync 
client = AsyncIOMotorClient(
    DATABASE_URL, uuidRepresentation="standard" )
    db = client[DB_NAME]

async def db_session():
        await init_beanie(
        database=db,
        document_models=[Project, User],
    )
@worker_ready.connect
def startup_celery_ecosystem(**kwargs):
            logger.info('Startup celery worker process')
            async_to_sync(db_session)()
            logger.info('FINISHED : Startup celery worker process')
async def get_users():
    users = User.find()
    users_list = await users.to_list()
    return users_list

@celery_app.task
def pool_db():
    async_to_sync(get_users)()
    #syncify(get_users)() same error User class is not initialized yet (init_beanie should have already initialized all the models )

使用这种实现方式,我无法通过User和Project类访问我的数据库,并且会产生一个错误,就好像User和Project还没有被实例化一样。

解决方法是在模块级别调用db_session(),这可以解决数据库模型实例化的问题,但现在当查询数据库时,我的celery任务会出现以下错误:

运行时错误:事件循环已关闭

第二个实现

from asyncer import syncify
from asgiref.sync import async_to_sync client = AsyncIOMotorClient(
DATABASE_URL, uuidRepresentation="standard" )
db = client[DB_NAME]

async def db_session():
        await init_beanie(
        database=db,
        document_models=[Project, User],
    )
# now  init_beanie at module level
async_to_sync(db_session)()

async def get_users():
    users = User.find()
    users_list = await users.to_list()
    return users_list

@celery_app.task
def pool_db():
    # this raises the following Runtime error RuntimeError('Event loop is closed')
    async_to_sync(get_users)()
    #syncify(get_users)() same error 

我对asyncio的实现方式以及asyncer和asgiref如何允许在同步线程中运行异步代码并感到困惑,希望能得到帮助。


1
只想说我目前也遇到了这个问题。直到今天都从未发生过。 - Samuele B.
@SamueleB。对我来说,我刚开始着手一个项目,这是我第一次使用Celery,但它的文档不够完善,也没有与asyncio很好地集成。 - H-ADJI
1个回答

0
经过多次使用flower监控工作进程并记录工作ID(进程ID)的调查,发现Celery worker本身不处理任何任务,它会生成其他子进程(这是我的情况,因为我正在使用默认执行器池prefork),而信号(worker_ready.connect)仅在supervisor进程Celery worker上运行,而不在子进程上运行,并且由于进程在内存方面是隔离的,这意味着您无法从子进程访问db连接或任何初始化的资源。 现在我正在使用带有gevent的celery,它只生成1个进程,因为最初我的项目不需要CPU密集型任务,这意味着我不需要prefork池提供的所有CPU功率。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接