兔子MQ - PHP AMQP断开连接的错误

5
我正在处理一个巨大的 XML 文档(其中包含约一百万条记录),然后使用 RabbitMQ 导入格式化后的版本到数据库。每次发布约 200,000 条记录后,我都会收到一个“broken pipe”错误,并且 RabbitMQ 无法从中恢复。

注意错误:fwrite():发送 2651 字节失败,errno=11,资源暂时不可用于[/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc,第 439 行]

注意错误:fwrite():发送 33 字节失败,errno=104,连接被对等方重置于[/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc,第 439 行]

注意错误:fwrite():发送 19 字节失败,errno=32,管道已断开于[/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc,第 439 行]

这随后会导致节点故障错误,需要手动杀死进程才能从中恢复。
以下是我的类方法:
public function publishMessage($message) {
    if (!isset($this->conn)) {
        $this->_createNewConnectionAndChannel();
    }
    try {
        $this->ch->basic_publish(
            new AMQPMessage($message, array('content_type' => 'text/plain')), 
            $this->defaults['exchange']['name'], 
            $this->defaults['binding']['routing_key']
        );
    } catch (Exception $e) {
        echo "Caught exception : " . $e->getMessage();
        echo "Creating new connection.";
        $this->_createNewConnectionAndChannel();
        $this->publishMessage($message); // try again
    }
}

protected function _createNewConnectionAndChannel() {
    if (isset($this->conn)) {
        $this->conn->close();
    }

    if(isset($this->ch)) {
        $this->ch->close();
    }

    $this->conn = new AMQPConnection(
        $this->defaults['connection']['host'], 
        $this->defaults['connection']['port'], 
        $this->defaults['connection']['user'], 
        $this->defaults['connection']['pass']
    );
    $this->ch = $this->conn->channel();
    $this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true);
    $this->ch->basic_qos(0 , 20 , 0); // fair dispatching

    $this->ch->queue_declare(
        $this->defaults['queue']['name'],
        $this->defaults['queue']['passive'],
        $this->defaults['queue']['durable'],
        $this->defaults['queue']['exclusive'],
        $this->defaults['queue']['auto_delete']
    );

    $this->ch->exchange_declare(
        $this->defaults['exchange']['name'],
        $this->defaults['exchange']['type'],
        $this->defaults['exchange']['passive'],
        $this->defaults['exchange']['durable'],
        $this->defaults['exchange']['auto_delete']
    );

    $this->ch->queue_bind(
        $this->defaults['queue']['name'],
        $this->defaults['exchange']['name'],
        $this->defaults['binding']['routing_key']
    );
}

非常感谢您的帮助。


你尝试过PECL AMQP扩展吗?从我的经验来看,它要稳定得多且维护得更好 - salathe
3个回答

10

确保您已为Rabbit MQ中的用户添加了虚拟主机访问权限。我创建了新用户,并忘记为默认使用的“ /”主机设置访问权限。

您可以通过管理面板yourhost:15672 >管理员>单击用户>查找“设置权限”来完成此操作。

附注:我假设您的RabbitMQ服务正在运行,用户存在且密码正确。


4
实际上,当您的消息内容较大且消费者花费了太多时间处理单个消息时,就会出现这个问题,这是向rabbit响应“ACK”并尝试消耗另一条消息的问题。
例如,当我遇到这个问题时,我会尝试“缩小”我的消息,因为它是一个产品工人,每个消息都有大约1k个产品ID,所以我将其更改为100个产品,效果非常好。
您可以在此处阅读有关使用心跳检测死亡TCP连接的更多信息

2

这个问题发生在我的RabbitMQ连接断开时(原因并不重要,我有意停止了RabbitMQ服务以进行一些失败测试),当我尝试通过关闭旧连接并初始化新连接重新连接到RabbitMQ时,我收到了一个 Broken pipe or closed connection 错误。

我解决这个问题的方法是在我的连接上使用 reconnect() 方法:

$channel->reconnect();

$channel->getConnection()->重新连接(); - Vasiliy Letuyev
我在执行 $channel->getConnection()->reconnect(); 后遇到了 "broken pipe" 错误,请问您知道原因吗? - Daart Kote

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接