PHP AMQP 的延迟队列实现

16

最近,我在生产者/消费者队列系统上进行了快速实现。

<?php
namespace Queue;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;    

class Amqp
{
    private $connection;
    private $queueName;
    private $delayedQueueName;
    private $channel;
    private $callback;

    public function __construct($host, $port, $login, $password, $queueName)
    {
        $this->connection = new AMQPStreamConnection($host, $port, $login, $password);
        $this->queueName = $queueName;
        $this->delayedQueueName = null;
        $this->channel = $this->connection->channel();
        // First, we need to make sure that RabbitMQ will never lose our queue.
        // In order to do so, we need to declare it as durable. To do so we pass
        // the third parameter to queue_declare as true.
        $this->channel->queue_declare($queueName, false, true, false, false);
    }

    public function __destruct()
    {
        $this->close();
    }

    // Just in case : https://dev59.com/R3VC5IYBdhLWcg3w51hv
    // We should call close explicitly if possible.
    public function close()
    {
        if (!is_null($this->channel)) {
            $this->channel->close();
            $this->channel = null;
        }

        if (!is_null($this->connection)) {
            $this->connection->close();
            $this->connection = null;
        }
    }

    public function produceWithDelay($data, $delay)
    {
        if (is_null($this->delayedQueueName))
        {
            $delayedQueueName = $this->queueName . '.delayed';

            // First, we need to make sure that RabbitMQ will never lose our queue.
            // In order to do so, we need to declare it as durable. To do so we pass
            // the third parameter to queue_declare as true.
            $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
                new AMQPTable(array(
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $this->queueName
                ))
            );

            $this->delayedQueueName = $delayedQueueName;
        }

        $msg = new AMQPMessage(
            $data,
            array(
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'expiration' => $delay
            )
        );

        $this->channel->basic_publish($msg, '', $this->delayedQueueName);
    }

    public function produce($data)
    {
        $msg = new AMQPMessage(
            $data,
            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );

        $this->channel->basic_publish($msg, '', $this->queueName);
    }

    public function consume($callback)
    {
        $this->callback = $callback;

        // This tells RabbitMQ not to give more than one message to a worker at
        // a time.
        $this->channel->basic_qos(null, 1, null);

        // Requires ack.
        $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback'));

        while(count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }

    public function consumeCallback($msg)
    {
        call_user_func_array(
            $this->callback,
            array($msg)
        );

        // Very important to ack, in order to remove msg from queue. Ack after
        // callback, as exception might happen in callback.
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }

    public function getQueueSize()
    {
        // three tuple containing (<queue name>, <message count>, <consumer count>)
        $tuple = $this->channel->queue_declare($this->queueName, false, true, false, false);
        if ($tuple != null && isset($tuple[1])) {
            return $tuple[1];
        }
        return -1;
    }
}

public function producepublic function consume的组合按预期工作。

然而,当涉及到延迟队列系统时,public function produceWithDelaypublic function consume的组合不按预期工作。调用consume的消费者无法接收到任何项目,即使等待一段时间。

我认为我的produceWithDelay实现有问题。请问是什么问题?


尝试将您的队列声明为 $channel->queue_declare("name", false, false, false, true, true, array());,交换机可能跟随此gist - vardius
这是程序相关内容的翻译:不需要从头开始实现。这就是你应该做的方式https://dev59.com/U3zaa4cB1Zd3GeqPUcrQ#45549182 - Maksim Kotlyar
3个回答

4
首先,请运行命令rabbitmq-plugins list,确认您的插件rabbitmq_delayed_message_exchange是否已启用。如果未启用,请在此处阅读更多信息。
其次,您需要更新__construct方法,因为您需要以稍微不同的方式声明队列。我并不打算更新您的构造函数,但愿提供我的简单示例:
声明队列:
<?php

require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$args = new AMQPTable(['x-delayed-type' => 'fanout']);
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args);
$args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']);
$channel->queue_declare('delayed_queue', false, true, false, false, false, $args);
$channel->queue_bind('delayed_queue', 'delayed_exchange');

发送消息:

$data = 'Hello World at ' . date('Y-m-d H:i:s');
$delay = 7000;
$message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$headers = new AMQPTable(['x-delay' => $delay]);
$message->set('application_headers', $headers);
$channel->basic_publish($message, 'delayed_exchange');
printf(' [x] Message sent: %s %s', $data, PHP_EOL);
$channel->close();
$connection->close();

接收消息:

$callback = function (AMQPMessage $message) {
    printf(' [x] Message received: %s %s', $message->body, PHP_EOL);
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};
$channel->basic_consume('delayed_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();

你也可以在这里找到源文件。


希望对你有所帮助!


1

附注:

我发现这是由于我的自身错误导致的。

应该是

    if (is_null($this->delayedQueueName))
    {
        $delayedQueueName = $this->queueName . '.delayed';

        $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
        ...

        $this->delayedQueueName = $delayedQueueName;
    }

我应该把它写下来

    if (is_null($this->delayedQueueName))
    {
        $delayedQueueName = $this->queueName . '.delayed';

        $this->channel->queue_declare(delayedQueueName, false, true, false, false, false,
        ...

        $this->delayedQueueName = $delayedQueueName;
    }

我的成员变量还没有正确初始化。

以下是一个完整可行的代码,供您参考。

<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class Amqp
{
    private $connection;
    private $queueName;
    private $delayedQueueName;
    private $channel;
    private $callback;

    public function __construct($host, $port, $login, $password, $queueName)
    {
        $this->connection = new AMQPStreamConnection($host, $port, $login, $password);
        $this->queueName = $queueName;
        $this->delayedQueueName = null;
        $this->channel = $this->connection->channel();
        $this->channel->queue_declare($queueName, false, true, false, false);
    }

    public function __destruct()
    {
        $this->close();
    }

    public function close()
    {
        if (!is_null($this->channel)) {
            $this->channel->close();
            $this->channel = null;
        }

        if (!is_null($this->connection)) {
            $this->connection->close();
            $this->connection = null;
        }
    }

    public function produceWithDelay($data, $delay)
    {
        if (is_null($this->delayedQueueName))
        {
            $delayedQueueName = $this->queueName . '.delayed';

            $this->channel->queue_declare($delayedQueueName, false, true, false, false, false,
                new AMQPTable(array(
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $this->queueName
                ))
            );

            $this->delayedQueueName = $delayedQueueName;
        }

        $msg = new AMQPMessage(
            $data,
            array(
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'expiration' => $delay
            )
        );

        $this->channel->basic_publish($msg, '', $this->delayedQueueName);
    }

    public function produce($data)
    {
        $msg = new AMQPMessage(
            $data,
            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );

        $this->channel->basic_publish($msg, '', $this->queueName);
    }

    public function consume($callback)
    {
        $this->callback = $callback;

        $this->channel->basic_qos(null, 1, null);

        $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'callback'));

        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }

    public function callback($msg)
    {
        call_user_func_array(
            $this->callback,
            array($msg)
        );

        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
}

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接