从RabbitMq消费未确认的消息

18

我创建了一个简单的发布者和订阅者,使用basic.consume在队列上进行订阅。

当任务正常运行时,我的消费者会确认消息。当我遇到异常时,我不会确认该消息并尽快返回。已确认的消息从队列中消失,所以这部分功能正常。
现在,我想让消费者重新处理失败的消息,但唯一的方法是重新启动消费者。

我需要如何解决这个问题?

配置代码

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);

$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');

消费者代码

$queue->consume(array($this, 'callback'));

public function callback(AMQPEnvelope $msg)
{
    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return;
    }
    return $queue->ack($msg->getDeliveryTag());
}

制造商代码

$exchange->publish('message');

你使用哪种编程语言?能否提供一些代码? - pinepain
@zaq178miami,请查看我的编辑后的消息。 - Bram Gerritsen
2个回答

28
如果消息未被确认并且应用程序失败,它将被自动重新传送,并在信封上设置redelivered属性为true(除非您使用no-ack = true标志消耗了它们)。
更新:
在catch块中使用重传标志nack消息。
    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
    }

在RabbitMQ和AMQP协议中,当重传计数未实现时,请注意无限重发消息的问题。

如果您不想处理这样的消息并希望添加一些延迟,可以在调用nack方法之前添加sleep()usleep(),但这不是一个好主意。

有多种技术可用于解决循环重传问题:

1. 依赖死信交换机

  • 优点:可靠、标准、清晰
  • 缺点:需要额外的逻辑

2. 使用每条消息或每个队列的TTL

  • 优点:易于实现,也是标准的、清晰的
  • 缺点:对于长队列可能会丢失一些消息

示例(请注意,对于队列ttl,我们只传递数字,而对于消息ttl,则传递任何将成为数字字符串的内容):

2.1 每条消息的ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'expiration' => '1000'
    )
);

2.2. 每个队列的ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish('message at ' . microtime(true));

3. 在消息正文或头中包含保持重新投递计数或剩余重新投递次数(也称为IP堆栈中的跳数限制或TTL)

  • 优点:在应用程序级别上为您提供额外的消息生存期控制
  • 缺点:修改消息并重新发布时存在显着的开销,应用程序特定,不清楚

代码:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'headers' => array(
            'ttl' => 100
        )
    )
);

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
        $headers = $msg->getHeaders();
        echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
        echo $msg->getDeliveryTag(), ' ';
        echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
        echo $msg->getBody(), PHP_EOL;

        try {
            //Do some business logic
            throw new Exception('business logic failed');
        } catch (Exception $ex) {
            //Log exception
            if (isset($headers['ttl'])) {
                // with ttl logic

                if ($headers['ttl'] > 0) {
                    $headers['ttl']--;

                    $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
                }

                return $queue->ack($msg->getDeliveryTag());
            } else {
                // without ttl logic
                return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
            }

        }

        return $queue->ack($msg->getDeliveryTag());
    }
);

可能有其他更好的方法来更好地控制消息重新传递流程。

结论:没有万能的解决方案。您必须决定哪种解决方案最适合您的需求,或者找到其他解决方案,但不要忘记在这里分享;)


感谢您的回答。redelivered确实设置为true,但我必须重新启动我的阻塞消费者以重新消费该消息。 - Bram Gerritsen
谢谢,这正是我需要的。你能给我一些方向/建议,如何防止无限次重新传递的消息吗?如果我可以将重新排队到队列的延迟时间延长几秒钟,那就太好了,这样我就不会过载我的消费服务器。 - Bram Gerritsen
感谢您提供如此清晰明了的更新,这将成为其他遇到类似问题的人非常好的资源。我已经开始实施使用DLX并且现在它已经正常工作了。它的表现完全符合我的期望。 - Bram Gerritsen
死信交换机 ;) - Bram Gerritsen

1
如果您不想重启消费者,则basic.recover AMQP命令可能是您想要的。根据AMQP协议
basic.recover(bit requeue)

Redeliver unacknowledged messages.

This method asks the server to redeliver all unacknowledged messages on a specified channel. 
Zero or more messages may be redelivered. This method replaces the asynchronous Recover. 

这个方法似乎不是我正在使用的客户端API的一部分。http://www.php.net/manual/en/book.amqp.php - Bram Gerritsen
1
RabbitMQ 对该方法有部分支持,请参阅官方文档 - pinepain

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接