RabbitMQ死信交换机无法接收消息

61

我正在尝试设置第一个RabbitMQ死信交换机,以下是我通过Web管理界面使用的步骤:

  1. 创建名称为“dead.letter.test”的新DIRECT交换机
  2. 创建新队列“dead.letter.queue”
  3. 将“dead.letter.queue”绑定到“dead.letter.test”
  4. 使用死信交换机设置为“dead.letter.test”创建新队列“test1”
  5. 向“test1”发送一条消息
  6. 在“test1”中使用Nack(requeue = false)拒绝该消息

我期望这些步骤应该通过“dead.letter.test”交换机将记录放入“dead.letter.queue”中。但实际上没有发生这种情况。

我可以手动将消息放入“dead.letter.test”交换机,并且它会显示在“dead.letter.queue”中,因此我知道这一点没问题。

当我查看管理UI时,它显示DLX参数已在队列“test1”上设置。

我做错了什么?


你使用了哪个路由键? - theMayer
8个回答

90

Gentilissimo Signore很好心地在Twitter上回答了我的问题。问题在于,如果您的死信交换设置为DIRECT,则必须指定死信路由键。如果您只是想让所有NACK的消息进入一个死信桶以供以后调查(就像我一样),则应将死信交换设置为FANOUT。

这里是更新后的可行步骤:

  1. 创建名为“dead.letter.test”的新FANOUT交换机
  2. 创建新队列“dead.letter.queue”
  3. 将“dead.letter.queue”绑定到“dead.letter.test”
  4. 使用死信交换“dead.letter.test”创建新队列“test1”
  5. 将一条消息发送到“test1”
  6. 在“test1”中Nack该消息(requeue = false)

如果我想要一个带有单个交换机的死信队列,并基于路由键进行重定向,那么使用扇形交换机是否可行? - Karthik
K-lyer的问题解决方案:关于死信交换机的用法,唯一“特殊”的是你需要将其与x-dead-letter-exchange和可选的x-dead-letter-routing-key属性绑定。如果你想要在路由键上重定向,只需将其设置为直连交换机,并将你的队列绑定到它即可。 - Rick O'Shea
对于想要让这个程序在与直接交换和单一队列配合时工作的人,需要将队列绑定并将队列名称添加为路由键属性。 - wolf_codes

18

没有路由键且使用直接交换机的死信队列


按照以下步骤操作即可成功:
1. 创建一个名为'dead_queue'的新队列。
2. 创建一个名为'dead_exchange'的交换机,其类型应为'direct'。
3. 将'dead_queue'和'dead_exchange'绑定,不需要指定路由键。
4. 创建一个名为'test_queue'的新队列,并将它的'x-dead-letter-exchange'设置为'dead_exchange'。
5. 创建一个名为'test_exchange'的交换机,其类型应为'direct'。
6. 将'test_exchange'和'test_queue'进行绑定,不需要指定路由键。

最后我们将进行检查。首先在'test_exchange'上发布一条带有'expiration'参数设置为10000的消息。当在'test_exchange'上发布消息时,该消息将进入'test_queue',而当一个消息过期后,队列将搜索DLX参数(Dead Letter Exchange name),如果找到名称为'dead_exchange'的消息,则将该消息传递给'dead_exchange'并将其发送到'dead_queue'中.. 如果您仍有任何关于此的问题,请写下您的问题,我一定会查看它。谢谢。

注意:必须在'test_exchange'上发布消息,因为'test_queue'和'test_exchange'的绑定没有指定路由键,并且这将正常工作。但是,如果您在'default exchange'和'routing key'上发布消息,则该消息将不会到达目标队列,并且在消息过期后,队列会尝试使用默认的路由键将该死信消息传递到'dead_exchange',但消息将无法到达该队列。


你有Node.js代码可以参考吗?我正在尝试找到在发布者中定义死信交换的位置。我已经设置了一个主题交换机。 - user269867
@user269867 当然,我会研究一下。因为现在我正在使用Node.js工作。我们可以直接通过Skype交流sahil.gulati1991@outlook.com - Sahil Gulati

7
如果你想在死信交换机上使用自定义路由键,需要在声明工作队列时设置x-dead-letter-routing-key(在你的情况下是test1),否则将使用默认路由键。在你的情况下,RabbitMQ代理检测到循环并简单地丢弃被拒绝的消息。
你需要在test1队列上设置x-dead-letter-exchange=dead.letter.testx-dead-letter-routing-key=dead.letter.queue参数。

Zaq,感谢您的回复,我尝试添加了x-dead-letter-exchange和x-dead-letter-routing-key,但仍然无法将被nacked的消息放入dead letter exchange中。我的目标很简单:任何在“test1”队列中被nacked的消息都将被放入“dead.letter.test”交换机中,然后连接到该交换机的任何队列都将接收该消息。我需要自定义路由键来实现这个目标吗? - jhilden
请在问题中指定您使用的语言和库来处理AMQP代理。并添加一些可以重现您问题的代码。同时请指定RabbitMQ的版本。 - pinepain
此外,设置备用交换机,以防您的消息无法路由。其中一个可能的解决方案是在FANOUT交换机上进行实验,这肯定会将消息路由到任何地方。 - pinepain
@jhilden,你解决了你的问题吗?我也遇到了同样的问题。 - user269867

4

如果不是必须的话,就不需要创建FANOUT交换机。

您可以使用与其他交换机已经使用的相同路由键创建DIRECT交换机。而且不需要为新交换机创建新队列。您可以使用现有队列来处理新交换机。您只需要将新交换机与队列绑定即可。

这是我的receive.js文件:

var amqp = require("amqplib/callback_api");
var crontab = require('node-crontab');

amqp.connect("amqp://localhost", function (err, conn) {
conn.createChannel(function (err, ch) {
    var ex = 'direct_logs';
    var ex2 = 'dead-letter-test';
    var severity = 'enterprise-1-key';

    //assert "direct" exchange
    ch.assertExchange(ex, 'direct', { durable: true });
    //assert "dead-letter-test" exchange
    ch.assertExchange(ex2, 'direct', { durable: true });

    //if acknowledgement is nack() then message will be stored in second exchange i.e. ex2="dead-letter-test"
    ch.assertQueue('enterprise-11', { exclusive: false, deadLetterExchange: ex2 }, function (err, q) {
        var n = 0;
        console.log(' [*] Waiting for logs. To exit press CTRL+C');
        console.log(q);

        //Binding queue with "direct_logs" exchange
        ch.bindQueue(q.queue, ex, severity);
        //Binding the same queue with "dead-letter-test"
        ch.bindQueue(q.queue, ex2, severity);

        ch.consume(q.queue, function (msg) {
            // consume messages via "dead-letter-exchange" exchange at every second.
            if (msg.fields.exchange === ex2) {
                crontab.scheduleJob("* * * * * *", function () {
                    console.log("Received by latest exchange %s", msg.fields.routingKey, msg.content.toString());
                });
            } else {
                console.log("Received %s", msg.fields.routingKey, msg.content.toString());
            }

            if (n < 1) {
                // this will executes first time only. Here I'm sending nack() so message will be stored in "deadLetterExchange"
                ch.nack(msg, false, false);
                n += 1;
            } else {
                ch.ack(msg)
                n = 0
            }
        }, { noAck: false });
    });
  });
});

4

如果您希望所有队列都有相同的死信交换机,那么设置一个通用策略会更加容易:

sudo rabbitmqctl -p /my/vhost/path set_policy DLX ".*" '{"dead-letter-exchange":"MyExchange.DEAD"}' --apply-to queues

我非常推荐这种方法,因为它更加灵活,并且是rabbitmq团队推荐的方法。我实现了两种方法,策略方法导致代码复杂度大大降低,使得动态更改行为变得非常容易,而无需备份和重新创建队列。在此处查看为什么应该使用策略: https://www.rabbitmq.com/parameters.html#policies - Fredrik Corneliusson
我已经尝试了交换机和队列级别的策略,但是消息从未出现在我的死信交换机中,尽管如果我在声明队列时通过参数声明,则可以正常工作。我同意^^,这是一个更好的解决方案,最重要的是当我想要更新策略时,我不必重新声明/重新创建/迁移队列。 - Damien Roche

2

创建名为“dead.letter.test”的DIRECT交换机

正确

创建名为“dead.letter.queue”的队列

正确

将“dead.letter.queue”绑定到“dead.letter.test”上

正确

创建带有死信交换机设置为“dead.letter.test”的新队列“test1”

我假设您正在创建test1队列并将其绑定到dead.letter.test交换机上

向“test1”发送一条消息

如果您想要您的消息被dead.letter.queue接收,则在发送消息时必须提供路由键,并且使用dead.letter.queue的客户端也应该使用相同的路由键进行消费

如果您发布消息而未指定路由键,则只有订阅test1的客户端将接收到该消息。

如果您将消息发布到direct.letter.test交换机,则所有队列都将接收到该消息。它将像一个fanout交换机一样工作

因此,如果您想让dead.letter.queue接收消息,则必须将消息发布到该队列或在发布和订阅时使用相同的路由键并将消息发布到交换机


Atul,"test1"队列没有绑定到任何交换机,我只是直接发布到它进行测试。我的目标是,当来自“test1”的任何消息被nack时,将该消息放入“dead.letter.test”交换机中。 - jhilden
你需要将它绑定到dead.letter.test。顺便问一下,你是如何在没有提及交换机的情况下创建队列的? - Jack Daniel's
我的猜测是,如果您没有提到交换机,那么您的队列会自动绑定到默认交换机,因此您的消息会被发布到默认交换机。 - Jack Daniel's
Atul,你说得对。如果你不指定队列所在的交换机(这是有效的),那么当你直接向队列发送消息时,它会通过默认交换机进行传递。无论如何,如果一个消息在队列上被拒绝(无论它是如何进入该队列的),它都应该被发送到与原始队列(test1)相关联的死信队列中。 - jhilden
RabbitMQ中消息模型的核心思想是生产者永远不会直接向队列发送任何消息。实际上,很多时候生产者甚至不知道消息是否会被传递到任何队列。相反,生产者将消息发送到交换机。交换机是一个非常简单的东西,它从生产者那里接收消息并将其推送到队列中。交换机必须确切地知道如何处理接收到的消息。它应该附加到特定的队列吗?它应该附加到许多队列吗?还是应该被丢弃。 - Jack Daniel's

0

针对使用Spring-AMQP的用户

在我的情况下,问题是不同的。我想要一个死信交换机是直接类型的。我为队列设置了x-dead-letter-exchangex-dead-letter-routing-key。此外,在application.properties中我设置了spring.rabbitmq.listener.simple.default-requeue-rejected=false

看起来一切都很好,但是在调试时我注意到我的SimpleRabbitListenerContainerFactorydefaultRequeueRejected为null。原因是当您在@Configuration中声明SimpleRabbitListenerContainerFactory时,您创建了一个新的“非默认”bean。默认的bean会在幕后根据您的属性自动创建。但是您在@Config中的SimpleRabbitListenerContainerFactory不会读取这些属性,您必须自己读取并在Java代码中设置。

这种情况曾经发生在我身上,因为当我想要配置并发性时,我只是从Spring-AMQP文档中复制粘贴了配置。但你应该在一个地方完成所有操作,比如在属性中,像这样:

spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

或者完全使用Java,例如

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(5);
        factory.setMaxConcurrentConsumers(10);
        factory.setDefaultRequeueRejected(false);
        return factory;
    }

这两个是相同的。

我期望当我使用第二个选项(Java)时,它仍然可以从application.properties中获取属性,然后我可以在Java中进行自定义,但实际上并不是这样的。 是的,“复制粘贴”是邪恶的 :)


0
在我的情况下,问题是由于队列中有的。
ackMode="MANUAL"

但是我从未设置它(因为运行时异常),使用默认ACK代替。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接