来自RabbitMQ队列的批量消息

6
我有一个RabbitMQ集群中的请求流,多个消费者处理这些请求。问题在于,每个消费者出于性能原因必须批处理请求。具体而言,我可以通过批处理请求来分摊网络IO操作。
因此,每个消费者都希望最大化其可以批处理的请求数量,但不要增加太多延迟。
我可以在消费者收到第一个请求时启动计时器,并继续收集请求,直到以下两种情况之一发生:计时器超时或接收到500个请求。
是否有更好的方法实现此目标 - 而不阻塞每个消费者?
1个回答

6

一般而言,“批量处理消息”的网络方面是在basic.qos(prefetch-size, prefetch-count)参数的级别上处理的。在此方案中,代理将向消费者发送超出未确认消息的一些字节/消息(分别),但客户端库以逐个消息的方式将其分配给应用程序。

为了最大化收益,应用程序可以保留每条消息的basic.ack() ,并定期发出basic.ack(delivery-tag=n, multiple=True)以确认所有带有传递标记<=n的消息。


谢谢 ifLoop。您会推荐使用异步回调机制来检索消息,还是同步获取? - zonked.zonda
1
你应该优先使用 basic.consume 而不是 basic.get; prefetch 不适用于 basic.get。如果有意义的话,你应该以异步方式设计你的应用程序,因为 AMQP 更适合这种模型。 - SingleNegationElimination

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接