RabbitMQ如何向消费者发送消息?

11
我是一个RabbitMQ新手,因此需要有一个基本问题的指引:
RabbitMQ是按照消息到达的顺序将消息发送给消费者吗?还是在消息可用时将消息发送给消费者?
在消息消耗端点,我正在使用com.rabbitmq.client.QueueingConsumer。通过查看Sprint客户端源代码,我可以找出:
QueueingConsumer保持监听套接字以获取代理发送给它的任何消息。
收到的任何消息都会被解析并存储为Delivery,包含在QueueingConsumer中的LinkedBlockingQueue中。
这意味着即使消息处理端点很忙,消息也会被推送到QueueingConsumer中。
我的理解正确吗?

这更像是“拉队列还是推队列”的问题吗? - Kenji Noguchi
RabbitMQ是否会在消息到达时将其发送给消费者? 或者 RabbitMQ是否会在消息可用时将其发送给消费者? - Amit Sharma
4个回答

10

TLDR:当预取计数被超过时,您需要从RabbitMQ轮询消息,此时您将阻塞并仅接收心跳帧,直到获取的消息被确认。因此,您可以轮询,但如果未确认的消息数量小于预取计数,则只会获得新消息。新消息放置在队列消费者中,在理论上,您不应该在该队列消费者内部队列中拥有比预取计数多得多的消息。

详情:从低层面来看(我可能会弄错一些内容),RabbitMQ本身实际上并没有推送消息。客户端必须根据AMQP协议连续读取连接中的帧。很难将其分类为推送或拉取,但只需知道客户端必须连续读取连接,并且由于Java客户端可悲地是BIO,因此它是一个阻止/轮询操作。阻止/轮询基于AMQP心跳帧和常规帧以及套接字超时配置。

在Java RabbitMQ客户端中发生的情况是,每个通道(或者可能是每个连接)都有一个线程,该线程循环从RabbitMQ收集帧,这些帧最终成为命令,然后放入一个阻塞队列中(我认为它就像一个同步队列,即交接队列,但Rabbit有自己特殊的队列)。

QueueingConsumer是一个更高级别的API,并且会从早期提到的交接队列中拉取命令,因为如果将命令留在交接队列上,它将阻止通道帧收集循环。这可能不利于超时连接。此外,QueueingConsumer允许在单独的线程中完成工作,而不是与之前提到的循环帧线程位于相同的线程中。

现在,如果您查看大多数消费者实现,您可能会注意到它们几乎总是无限制的阻塞队列。我不完全确定为什么这些队列的限制不能是预取的倍数,但如果它们小于预取计数,则肯定会导致连接超时问题。


5

0

Rabbitmq主要使用推送机制。轮询将消耗服务器的带宽。轮询还存在每次轮询之间的时间间隔。这将无法实现低延迟。一旦队列中有消费者可用,Rabbitmq将向客户端推送消息。因此,连接是长期运行的。在rabbitmq中,ReadFrame基本上是等待传入的帧。


0

消费者可以使用get或consume
get:获取单个消息。控制权在消费者手中。根据业务逻辑,它可以随时调用get。
consume:消费者注册回调函数。当RabbitMQ可用时,发送消息。通过心跳保持连接活动。consume的另一个特点是,可以按消费者指定的批次发送消息。
默认情况下,消息会一直保留在队列中,直到消费者确认。消费者可以自动确认或以编程方式确认。在未收到确认之前,消息处于未确认状态。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接