如果消息未被确认并且应用程序失败,它将被自动重新传送,并在信封上设置
redelivered
属性为
true
(除非您使用
no-ack = true
标志消耗了它们)。
更新:
在catch块中使用重传标志
nack
消息。
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
}
在RabbitMQ和AMQP协议中,当重传计数未实现时,请注意无限重发消息的问题。
如果您不想处理这样的消息并希望添加一些延迟,可以在调用nack
方法之前添加sleep()
或usleep()
,但这不是一个好主意。
有多种技术可用于解决循环重传问题:
1. 依赖死信交换机
2. 使用每条消息或每个队列的TTL
- 优点:易于实现,也是标准的、清晰的
- 缺点:对于长队列可能会丢失一些消息
示例(请注意,对于队列ttl,我们只传递数字,而对于消息ttl,则传递任何将成为数字字符串的内容):
2.1 每条消息的ttl:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'expiration' => '1000'
)
);
2.2. 每个队列的ttl:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish('message at ' . microtime(true));
3. 在消息正文或头中包含保持重新投递计数或剩余重新投递次数(也称为IP堆栈中的跳数限制或TTL)
- 优点:在应用程序级别上为您提供额外的消息生存期控制
- 缺点:修改消息并重新发布时存在显着的开销,应用程序特定,不清楚
代码:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'headers' => array(
'ttl' => 100
)
)
);
$queue->consume(
function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
$headers = $msg->getHeaders();
echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
echo $msg->getDeliveryTag(), ' ';
echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
echo $msg->getBody(), PHP_EOL;
try {
throw new Exception('business logic failed');
} catch (Exception $ex) {
if (isset($headers['ttl'])) {
if ($headers['ttl'] > 0) {
$headers['ttl']--;
$exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
}
return $queue->ack($msg->getDeliveryTag());
} else {
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
}
}
return $queue->ack($msg->getDeliveryTag());
}
);
可能有其他更好的方法来更好地控制消息重新传递流程。
结论:没有万能的解决方案。您必须决定哪种解决方案最适合您的需求,或者找到其他解决方案,但不要忘记在这里分享;)