如何从AMQP(RabbitMQ)队列中有选择地删除消息?

16

我希望能够选择性地从AMQP队列中删除消息,而不必读取它们。

场景如下:

发送方想基于X类型的新信息到达这一事实过期X类型的消息。因为订阅者很可能还没有消费最新的X类型消息,所以发布者应该只删除之前的X类型消息,然后将最新的消息放入队列中。整个操作对订阅者应该是透明的 - 实际上,他应该使用诸如STOMP之类的简单方法来获取消息。

如何使用AMQP实现?或者在另一种消息传递协议中更方便?

我想避免复杂的基础设施。所需的整个消息传递过程就像上面那样简单:一个队列,一个订阅者,一个发布者,但是发布者必须具有根据给定条件临时删除消息的能力。

发布者客户端将使用Ruby,但实际上只要我发现如何在协议中完成此操作,我会处理任何语言。

4个回答

10
您不需要一个消息队列,而是需要一个键值数据库。例如,您可以使用Redis或Tokyo Tyrant获取一个简单的网络可访问的键值数据库。或者只需使用一个memcache。
每种消息类型都是一个键。当您使用相同的键写入新消息时,它将覆盖以前的值,因此该数据库的读取器永远无法获得过时的信息。
此时,您只需要一个消息队列来建立应该读取键的顺序(如果这很重要)。否则,只需不断地扫描数据库。如果您不断地扫描数据库,最好将数据库放在读取器附近以减少网络流量。
我可能会像这样做 key:typecode value:lastUpdated,important data 然后我会发送包含 typecode,lastUpdated 的消息。这样,读取器可以将该键的lastupdated与他们上次从数据库中读取的内容进行比较,并跳过读取它,因为它们已经是最新的。
如果您真的需要使用AMQP进行此操作,请使用RabbitMQ和自定义交换类型,具体来说是最后一个值缓存交换。示例代码在这里:https://github.com/squaremo/rabbitmq-lvc-plugin

8
目前在RabbitMQ(或更一般地说,在AMQP中)无法自动实现此功能。但是,这里有一个简单的解决方法。
假设您想发送三种类型的消息:X、Y和Z。如果我正确理解了您的问题,当X消息到达时,您希望代理忘记所有尚未传递的其他X消息。
在RabbitMQ中,这很容易做到:
- 生产者声明三个队列:X、Y和Z(它们会自动绑定到默认交换机,并以它们的名称作为路由键,这正是我们想要的), - 发布消息时,生产者首先清除相关队列(因此,如果它发布X消息,它首先清除X队列);这有效地删除了过时的消息, - 消费者只需从它想要的队列(X用于X消息,Y用于Y消息等)中消耗;从它的角度来看,它只需执行basic.get以获取下一个相关消息。
这意味着当两个生产者在大约相同的时间发送相同类型的消息时会出现竞争条件。结果是可能会有两个(或更多)消息同时出现在队列中,但由于消息数量受到生产者数量的上限限制,并且由于多余的消息会在下次发布时被清除,因此这不应该是什么大问题。
总之,这个解决方案比最佳解决方案多了一个额外的步骤,即在发布类型为X的消息之前清除队列X。
如果您需要设置此配置的任何帮助,请向rabbitmq-discuss邮件列表寻求建议。

在我的情况下,使用队列的唯一原因是X、Y、Z事件交错发生,订阅者应该按照它们到达的顺序读取它们——但只有最新的X、最新的Y和最新的Z。此外,类型的数量是数千个,因此订阅者不会监听数千个队列。 - Wojciech Kaczmarek
1
啊。同时监听成千上万个队列不是问题(几乎没有开销)。一旦消息进入队列,您实际上无法有选择性地删除它们。我想你需要的是支持原子移动操作的文件服务器:发布者开始将内容写入临时文件,完成后将该文件移动(重命名)到X;客户端然后只需读取文件X。 - scvalex

5

如果您只想从队列中删除前n个消息,则可以使用RabbitMQ Web-UI进行操作。

  • 从“Queues”选项卡中选择队列,滚动到“Get messages”部分
  • 设置参数“Requeue=No”,并设置要从队列中删除的消息数量
  • 按下“Get messages”按钮

3
这个问题因标题而备受关注。查看描述后,可以了解到更具体的场景。对于那些想要从队列中实际删除下一个(记住FIFO)消息的用户,您可以使用rabbitmqadmin并发出以下命令: rabbitmqadmin get queue=queuename requeue=false count=1 此命令实质上是消费该消息并且不做任何操作。如果需要带有备份消息标志的完整命令可能会像下面这样。请根据您的需求添加其他参数。 sudo python rabbitmqadmin -V virtualhostname -u user -p pass get queue=queuename requeue=false count=1 payload_file=~/origmsg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接