兔子消息队列将消息存储在交换机中。

4

我想了解在没有消费者运行的情况下是否可以将消息存储在RabbitMQ交换机中。

我理解(可能是不正确的),要实现这一点,交换机需要是“持久”的,队列也需要是“持久”的,并且消息需要使用“持久”标志发送。

'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT

我的主要目标是将所有消息存储在交换机中,以便在任何原因下没有消费者运行时,当我启动一个消费者时,所有交换机中的消息都可以被定向到绑定的队列。 我声明我的交换机和队列如下:

//Sender.php
public function sendToQueue(ActionMessage $message)
    {
        $headers = [
            'content-type' => 'application/json',
            'timestamp' => $message->getCreatedAt()->getTimestamp(),
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ];
        $channel = $this->connection->getChannel();
        $channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
        $qMessage = new AMQPMessage(json_encode($message->toArray()), $headers);
        $channel->basic_publish($qMessage, $this->exchangeName, $message->getTopic());
        return true;
    }
//Receiver.php
public function consume($callbackFunction)
        {
            $channel = $this->messenger->getChannel();
            $channel->exchange_declare($this->exchange, 'direct', false, true, false);
            list($queueName, ,) = $channel->queue_declare('', false, true, true, false);
            $channel->queue_bind($queueName, $this->exchangeName, $this->topicAction);

            $channel->basic_consume($queueName, '', false, true, false, false, $callbackFunction);

            while (count($channel->callbacks)) {
                $channel->wait();
            }

            $channel->close();
            $this->messenger->close();
        }

我会感激任何帮助(即使只是放弃这个想法并在中间插入一些存储)。 谢谢。


RabbitMQ exchange - 我想你的意思是 queue,exchange 是一种像 topicfanout 等交付方式。这是一种分发消息的方法。如果您使队列持久化、不自动删除并需要 ACK,则可以将消息放入其中而没有消费者。 - ArtisticPhoenix
如果您启用了自动删除,队列将在没有消费者时被删除;如果您不使用确认机制,消息会直接通过队列而无需等待(持久化可能是可选项,但这并不是一个坏主意)。这只是我临时想到的,但我确信如果设置错误,它们可能会引起问题,原因很明显。对于交换机也可能是如此,但我从未尝试过在交换机中“保留”消息。 - ArtisticPhoenix
我做的一件事是,为我的所有工人提供了一个额外的(自动删除/无确认)队列和一个扇出交换机,我用它来向工人发送命令消息。例如,我可以发送一个命令consume:queue_name给他们,然后他们就会执行。此外,这也默认为他们提供了一个队列,并且告诉他们有关新队列的信息。基本上,每当客户提交作业时,它都会进入特定于他们的队列中,主要是为了防止他们独占所有工人,因为他们的所有消息都进入一个专门为他们服务的队列中。然后工人们只是轮流处理它。等等。这样可以有机地防止这种情况的发生。 - ArtisticPhoenix
@ArtisticPhoenix 非常感谢,我会查看的。 - rayvr
1个回答

3
一种交换机不存储消息,这是队列的工作。你遇到的问题不是没有运行的消费者,而是没有队列存在,因为你让消费者自己声明他们自己的队列。
如果你希望消息在等待消费者接收之前保持持久性,那么你应该声明:
- 一个“Sender”将发布到的交换机 - 一个命名队列,每种类型的消息都会单独消费(如果使用直连交换,则为每个路由键一个)
这两个可以在发送方脚本中声明,但在大多数情况下,当应用程序部署时声明它们更有意义,就像处理数据库模式一样。
然后,你可以连接到命名队列而不是在接收方脚本中创建匿名队列,并开始接收等待在那里的消息。
这将影响多个相同路由键的消费者之间的交互方式。
  • 单个交换机连接多个队列,就像您现有的代码一样,会创建每条消息的多个副本。如果您有不同的消费者对相同的消息执行不同的操作,则此功能很有用。
  • 单个队列连接多个消费者,就像我上面建议的那样,将会分享这些消息,每个消费者基本上都是随机处理一个消息。如果您有多个相同的消费者来处理大量的消息,则此功能很有用。

您可能会发现this RabbitMQ simulator很有用,以便可视化差异。

您可能会发现实际上需要混合使用以下两种机制:

  • 为每个必须查看每条消息的消费者预先声明一个队列,以确保存储每条消息的副本,直到该特定消费者准备读取它为止。
  • 在其他消费者中声明额外的临时队列,以获取额外的副本当消息进入时。

最后需要注意的是,RabbitMQ有两种机制可以回退到不同的消息处理方式,以处理无法处理的消息:

  • 一个备用交换机可以捕获那些本应被丢弃的消息(因为没有适当的队列绑定)。
  • 一个死信交换机可以捕获那些从队列中被丢弃的消息(例如因为被消费者拒绝或达到配置的超时时间)。

如果您不想正常处理漏掉的消息,只是想检测它们并将其列在错误日志中,那么备用交换机可能会对您有所帮助。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接