PHP和AMQP RabbitMQ消费者

3
我有一个可以发布信息到RabbitMQ的PHP脚本,这些信息是从一个制表符分隔的文本文件中解析出来的。我直接将这个工作正常的代码从这个文件复制/粘贴到另一个文件,并希望建立一个消费者来获取发布到交换机的消息,对它们进行json_decode并将它们插入到数据库中。
所有尝试,甚至从PHP.net网站和SO中复制/粘贴示例代码都失败了,仅显示一个空白屏幕而没有错误消息,这甚至会导致php-fpm进程关闭。
请问为什么队列不会绑定,这里出了什么问题?
以下是一种尝试方法的示例,但我已经尝试过AMQP文档上PHP.net和SO的示例,但都无法工作。我可以正常发布,但当我尝试绑定队列时,它失败并且最终php-fpm被锁住了。
  • Nginx -> php-fpm
  • PHP 5.3.x
  • Macbook Pro (OSX Lion)
  • RabbitMQ (已安装librabbitmq和pecl amqp)
<?php
// Report all PHP errors
error_reporting(E_ALL);

/*****************************************
 * MQ settings
 ****************************************/
$mq = array(
           'host' => 'localhost',
           'port' => 5672,
           'login' => 'guest',
           'password' => 'guest',
           'exchange' => 'gbus.user',
           'routing_key' => 'gbus.test.mike',
           );

/*****************************************
 * Connect to queue
 ****************************************/

$conn_args = array('host' => $mq['host'], 'port' => $mq['port'], 'login' => $mq['login'], 'password' => $mq['password']);
$conn = new AMQPConnection($conn_args);
$conn->connect();

$ch = new AMQPChannel($conn);

// Create a new queue
$q = new AMQPQueue($ch);
$q->declare('test-queue');
$q->bind($mq['exchange'],$mq['routing_key']);

?>
<br>
<font color="blue" face="arial" size="4">File Contents</font>
<hr>
<?php 
while(true){
    $msg=$q->get();
    if ($msg['count']>-1){
        echo "\n--------\n";
        print_r($msg['msg']);
        echo "\n--------\n";
    }
    sleep(1);   
}
if (!$conn->disconnect()) {
    throw new Exception('Could not disconnect');
}
?>

这是我曾经用来发布到队列的示例代码,每次运行此代码后,我可以在RabbitMQ控制面板中看到20条新消息。出于测试目的,我将其限制为20条,但文件有成千上万行。

可用的发布代码:

<?php
/*****************************************
 * MQ settings
 ****************************************/
$mq = array(
           'host' => 'localhost',
           'port' => 5672,
           'login' => 'guest',
           'password' => 'guest',
           'exchange' => 'gbus.user',
           'routing_key' => 'gbus.test.mike',
           );

/*****************************************
 * Connect to queue
 ****************************************/

$conn_args = array('host' => $mq['host'], 'port' => $mq['port'], 'login' => $mq['login'], 'password' => $mq['password']);
$conn = new AMQPConnection($conn_args);
$conn->connect();

$ch = new AMQPChannel($conn);

$ex = new AMQPExchange($ch);
$ex->setName($mq['exchange']);


/*****************************************
 * Parse the file
 ****************************************/
$filename = "/tmp/Users.txt";
$board = "test";

$fd = fopen ($filename, "r");
$contents = fread ($fd,filesize ($filename));

fclose ($fd);
$delimiter = "\r\n";
$rows = explode($delimiter, $contents);
$counter = 0;
?>
<br>
<font color="blue" face="arial" size="4">File Rows (first 20)</font>
<hr>
<?php 
foreach ( $rows as $row )
{
    $counter++;
    echo "<b>Row $counter: </b> $row<br>";

    // build list columns
    list($login_name, $pwd, $account_type, $access_level, $status, $first_name, $last_name, $agent_code) = explode("\t", $row);

    // build assoc array for json
    $user = array("domain"=>$board, "username"=>$login_name, "user_id"=>$agent_code, "password"=>$pwd, "first_name"=>$first_name, "last_name"=>$last_name);

    // Publish a message to the exchange with a routing key
    $ex->publish(json_encode($user), $mq['routing_key'], AMQP_NOPARAM, array("content_type"=>"application/data"));

    if($counter == 20) {
        break;
    }
}

$ch->close();
$conn->close();
?>

我已经尝试了这里的所有示例并发布了它们,但消费失败:http://us3.php.net/manual/en/amqp.examples.php - Mike S.
我还将原始队列从交换机中解绑,以确保它没有“窃取”消息(通过控制面板无法确定是否已确认)。 - Mike S.
1
在rabbitmq文件夹的bin文件夹中,你可以找到rabbitmqctl。使用rabbitmqctl list_queues列出你的队列,使用list_exchanges列出交换机,使用list_bindings获取调试信息以帮助你解决问题。请查看http://www.rabbitmq.com/man/rabbitmqctl.1.man.html获取更多详细信息。将转储文件发布在此处,这将有助于找到问题所在。 - MaX
移除队列绑定调用后,我可以让它声明成功,但服务器会再次崩溃。每当我包括绑定时,它就失败了。不确定是否仅限于 Mac OSX Lion 问题;尚未在 Linux 上尝试过。很奇怪。 - Mike S.
php.net 上的 AMQP 文档已经过时(基本上没有更新),例如 $queue->declare 已经被弃用,但仍然存在于文档中。因此,在从中学习时要小心。如果您正在使用 rabbitMQ,则可以使用 "videlalvaro/php-amqplib"。 - voila
1个回答

0

工作队列是我正在尝试实现的内容,但使用Php、Pear等示例都失败了。你提供的链接没有提供PHP示例,而是引导到Rabbit网站上队列的Python示例。 - Mike S.
我发布的链接确实为我展示了PHP示例,我最近使用过它们,并且它们的表现完全符合预期。顺便说一下,我也在使用homebrew运行Mac OSX Lion盒子。 - williamvicary
请点击链接,然后将鼠标悬停在下面的“代码”部分,它们都是Python链接。但我明白你的意思。我会检查它们并报告。我想使用在PHP.net上记录的AMQP库,所以必须看看它们需要什么。 - Mike S.
这句话的意思是这些示例是RabbitMQ官网上官方教程的PHP版本 - 检查git hub文件的代码,它们都是PHP! - williamvicary

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接