我想了解在没有消费者运行的情况下是否可以将消息存储在RabbitMQ交换机中。
我理解(可能是不正确的),要实现这一点,交换机需要是“持久”的,队列也需要是“持久”的,并且消息需要使用“持久”标志发送。
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
我的主要目标是将所有消息存储在交换机中,以便在任何原因下没有消费者运行时,当我启动一个消费者时,所有交换机中的消息都可以被定向到绑定的队列。 我声明我的交换机和队列如下:
//Sender.php
public function sendToQueue(ActionMessage $message)
{
$headers = [
'content-type' => 'application/json',
'timestamp' => $message->getCreatedAt()->getTimestamp(),
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
];
$channel = $this->connection->getChannel();
$channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
$qMessage = new AMQPMessage(json_encode($message->toArray()), $headers);
$channel->basic_publish($qMessage, $this->exchangeName, $message->getTopic());
return true;
}
//Receiver.php
public function consume($callbackFunction)
{
$channel = $this->messenger->getChannel();
$channel->exchange_declare($this->exchange, 'direct', false, true, false);
list($queueName, ,) = $channel->queue_declare('', false, true, true, false);
$channel->queue_bind($queueName, $this->exchangeName, $this->topicAction);
$channel->basic_consume($queueName, '', false, true, false, false, $callbackFunction);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$this->messenger->close();
}
我会感激任何帮助(即使只是放弃这个想法并在中间插入一些存储)。 谢谢。
RabbitMQ exchange
- 我想你的意思是queue
,exchange 是一种像topic
或fanout
等交付方式。这是一种分发消息的方法。如果您使队列持久化、不自动删除并需要 ACK,则可以将消息放入其中而没有消费者。 - ArtisticPhoenixconsume:queue_name
给他们,然后他们就会执行。此外,这也默认为他们提供了一个队列,并且告诉他们有关新队列的信息。基本上,每当客户提交作业时,它都会进入特定于他们的队列中,主要是为了防止他们独占所有工人,因为他们的所有消息都进入一个专门为他们服务的队列中。然后工人们只是轮流处理它。等等。这样可以有机地防止这种情况的发生。 - ArtisticPhoenix