Celery工作进程 - 连接已关闭错误

3

我正在使用Flask和远程celery worker,为了进行celery通信,我正在使用rabbitmq作为消息代理。远程celery worker会随机抛出以下错误:

[2020-09-03 13:49:59,390: CRITICAL/MainProcess] Couldn't ack 20, reason:RecoverableConnectionError(None, 'connection already closed', None, '')
Traceback (most recent call last):
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\kombu\message.py", line 131, in ack_log_error
    self.ack(multiple=multiple)
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\kombu\message.py", line 126, in ack
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\amqp\channel.py", line 1394, in basic_ack
    spec.Basic.Ack, argsig, (delivery_tag, multiple),
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\amqp\abstract_channel.py", line 56, in send_method
    raise RecoverableConnectionError('connection already closed')
amqp.exceptions.RecoverableConnectionError: connection already closed

我正在使用celery 4版本。 有什么方法可以避免这个错误吗?

1个回答

5

我在使用 celery 版本 4.4.6 和 rabbitmq 运行很长时间的任务时遇到了同样的问题。然后我通过以下配置更改运行相同的任务,现在它可以正常工作(我以独立模式运行 worker)。重要的配置似乎是经纪人心跳:https://www.rabbitmq.com/heartbeats.html。这应该禁用心跳,并且连接不应由于错过的心跳而被重置。

CELERY_BROKER_HEARTBEAT = 0

关于 Celery 的文档链接:https://docs.celeryproject.org/en/v4.4.6/userguide/configuration.html#std:setting-broker_heartbeat

与 Flask 的集成应该像这样工作:

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] =
                            'amqp://myuser:mypassword@localhost:5672/myvhost'
app.config['CELERY_BROKER_HEARTBEAT'] = 0

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

我可以推荐这里的博客(代码片段来源):https://blog.miguelgrinberg.com/post/using-celery-with-flask

我应该在哪里设置这个 CELERY_BROKER_HEARTBEAT = 0。你是在任何配置文件中进行设置吗? - sattva_venu
请阅读文档:https://docs.celeryproject.org/en/v4.4.6/userguide/configuration.html 在我的情况下,这是一个 Django 应用程序,该变量设置在 settings.py 文件中。 - sidi7
我正在使用Flask,所以这个选项应该在Celery实例中设置吗? 例如: celery_app = Celery(backend='<backend'>, broker='<broker>', CELERY_BROKER_HEARTBEAT = 0) - sattva_venu
我为Flask添加了一些代码。请查看引用的博客,了解有关将Celery与Flask集成的更多信息。 - sidi7
2
可以确认这是解决方案。对于celery 5x,请执行以下操作:broker_pool_limit = None task_acks_late = True broker_heartbeat = 0 worker_prefetch_multiplier = 1 - Math is Hard
@MathisHard,你使用的是哪个RabbitMQ版本来实现这些与celery 5x相关的结果? - Lyndon Alcock

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接