具有长ETA的Celery任务与RabbitMQ

5

RabbitMQ 可以强制消费者执行确认超时: https://www.rabbitmq.com/consumers.html#acknowledgement-modes 默认情况下,如果任务在15分钟内未被确认,则整个节点将出现PreconditionFailed错误。

我需要安排一个celery任务(使用RabbitMQ作为代理),延迟时间较长(1-3小时),但是目前(使用celery 4和rabbitmq 3.8)当我尝试这样做时...在消费者确认超时配置为我的RMQ之后,我会收到PreconditionFailed错误。

是否有一种方法可以配置 ETA celery 任务在消费者确认超时之前被确认?

目前,我将consumer_timeout增加到大于我的ETA时间差,但肯定有更好的解决方案...


2
rabbitmq.config 中设置 {consumer_timeout, false} - jrhodin
很遗憾,这似乎对我不起作用:https://github.com/rabbitmq/rabbitmq-server/issues/3096 - Piotr Zakrzewski
2
如果您使用rabbitmq.config和经典格式,则可以这样做。 * /etc/rabbitmq/rabbitmq.config``` - jrhodin
你好,我也想做类似的事情,并尝试使用“rabbitmq-delayed-exchange-plugin”和“dead-letter-queue”。我写了一篇关于这两个的文章,并在下面提供了链接。希望对某些人有所帮助。使用dlx:https://medium.com/@anandhu.gopi97/how-to-schedule-celery-tasks-part-1-a1ea7b70b6a3使用RabbitMQ延迟消息插件:https://medium.com/@anandhu.gopi97/how-to-schedule-celery-tasks-part-2-320b40d39945 - Anandhu Gopi
5个回答

6

有一种方法可以通过在RabbitMQ服务器上运行以下命令来更改运行实例的consumer_timeout

rabbitmqctl eval 'application:set_env(rabbit, consumer_timeout, 36000000).'

这将把新的超时时间设置为10小时(36000000毫秒)。为了使其生效,您需要重新启动您的工人。现有的工作连接将继续使用旧的超时时间。

您也可以查看当前配置的超时值:

rabbitmqctl eval 'application:get_env(rabbit, consumer_timeout).'

如果您是通过Docker镜像运行RabbitMQ,则可以按如下设置该值:只需将 -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbit consumer_timeout 36000000" 添加到您的 docker run 中,或将环境变量 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS 设置为"-rabbit consumer_timeout 36000000"即可。

希望这可以帮助您!


5

在Celery 5中,我认为调整consumer_timeout是您唯一的选择。请注意,这仅适用于RabbitMQ 3.8.15及更高版本。

另一个可能的解决方案是让worker在收到消息后立即确认(ack)。只有在您不需要保证任务完成时才这样做。例如,如果worker在执行任务前崩溃,Celery将不知道它没有完成任务。

对于延迟任务,在RabbitMQ中,最好的选择是使用延迟消息交换(delayed-message-exchange)或死信队列(dead lettering)。但Celery不能使用任何一种选项。在Celery中,消息被发布到消息代理,然后尽快发送到消费者。延迟由worker强制执行,而不是在代理处。


0

我遇到了同样的问题,但我已经解决了。

使用RabbitMQ版本3.8.14(3.8.14-management),我能够发送长时间的ETA任务。

我个人使用Celery发送具有长时间ETA的任务。 在我的情况下,我设置了celery添加超时(~consumer_timeout),我可以使用time_limit或soft_time_limit进行配置。


我认为这并没有解决所讨论的问题。time_limitsoft_time_limit是用于运行任务的,而讨论的问题是针对使用celery的ETACountdown功能的任务。这些任务由工作进程接管,但尚未被确认(因为它们尚未在给定的ETA下运行),因此出现了问题(新版本的RabbitMQ不喜欢这样!) - Sarang

0

0

我也想做类似的事情,并尝试使用“rabbitmq-delayed-exchange-plugin”和“dead-letter-queue”来实现。我写了一篇关于这两个插件的文章,并在下面提供了链接。希望对某些人有所帮助。简而言之,我们可以使用这两种方法来调度celery任务(处理长时间的ETA)。

使用dlx:

死信交换(DLX): enter image description here

使用RabbitMQ延迟消息插件:

RabbitMQ 延迟消息插件: 在此输入图片描述

附言:我知道 StackOverflow 的答案应该是自解释的,但将链接作为答案发布太长了。抱歉。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接