Celery和RabbitMQ的超时和连接重置

7
我正在一台Windows 10机器上的Django应用程序中使用RabbitMQ 3.6.0和Celery 3.1.20。所有内容都在同一台计算机上运行。我已经将Celery配置为Acknowledge Late (CELERY_ACKS_LATE=True),但是现在出现了连接问题。
我启动了Celery worker,在处理任务50-60秒后,每个工作线程都会因以下消息而失败:

Couldn't ack ###, reason:ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)

(###是任务编号)
当我查看RabbitMQ日志时,发现如下内容:

=INFO REPORT==== 10-Feb-2016::22:16:16 === accepting AMQP connection <0.247.0> (127.0.0.1:55372 -> 127.0.0.1:5672)

=INFO REPORT==== 10-Feb-2016::22:16:16 === accepting AMQP connection <0.254.0> (127.0.0.1:55373 -> 127.0.0.1:5672)

=ERROR REPORT==== 10-Feb-2016::22:17:14 === closing AMQP connection <0.247.0> (127.0.0.1:55372 -> 127.0.0.1:5672): {writer,send_failed,{error,timeout}}

这个错误恰好发生在Celery workers重置连接时。
我以为这是一个AMQP心跳问题,所以将BROKER_HEARTBEAT = 15添加到我的Celery设置中,但这没有任何改变。

你最终有没有偶然解决这个问题? - Jarad
在我的情况下,我在代理url的末尾有一个端口。移除该端口后问题得到解决。我认为Celery会自动处理端口,因为代理可能在不同的端口上运行,具体取决于可用性,硬编码端口并不理想。 - Patrick Mutuku
2个回答

7

我曾在Windows上使用Celery处理长时间运行的任务时遇到类似问题,但最终通过以下配置解决了这个问题:

CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1

我使用了 -Ofair 选项来启动 celery 工作进程守护进程:
celery -A test worker -l info -Ofair

在我有限的理解中,CELERYD_PREFETCH_MULTIPLIER设置了特定Celery工作进程队列中等待的消息数量。默认情况下,它设置为4。如果将其设置为1,则每个工作进程只会消耗一个消息并在消耗另一个消息之前完成任务。由于长时间运行的任务中与RabbitMQ的连接经常中断,我遇到了问题,但是如果任何其他消息/任务在celery队列中等待,则会重新尝试执行该任务。对于我的情况,还有以下选项:
CELERYD_CONCURRENCY = 1

将并发设置为1对我很有意义,因为我有需要大量RAM的长时间运行任务,所以它们需要单独运行。


感谢这些提示。我真的找不到其他能解决这个确切问题的东西。我遇到了与@zmbq相同的ConnectionResetError错误。我正在Windows上运行,也有一个solo并发池。我正在从Adwords API中提取数据。 - Jarad
自从这篇回答发布以来,Windows对Celery的支持已经被取消,因此请谨慎考虑这里的任何建议。 - bbaker
尝试在Windows上操作,但是仍然遇到了相同的问题。 - sattva_venu

2
@bbaker的解决方案使用CELERY_ACKS_LATE(在Celery 4x中为task_acks_late)并没有对我起作用。我的工作进程位于Kubernetes pod中,必须使用--pool solo运行,并且每个任务需要30-60秒。
我通过包含broker_heartbeat=0来解决这个问题。
broker_pool_limit = None
task_acks_late = True
broker_heartbeat = 0
worker_prefetch_multiplier = 1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接