RabbitMQ等待多个队列完成

18

好的,以下是正在发生的事情的概述:

    M <-- Message with unique id of 1234
    |
    +-Start Queue
    |
    |
    | <-- Exchange
   /|\
  / | \
 /  |  \ <-- bind to multiple queues
Q1  Q2  Q3
\   |   / <-- start of the problem is here
 \  |  / 
  \ | /
   \|/
    |
    Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
    |
    C <-- Consumer 

我有一个交换机,可以将消息推送到多个队列,每个队列都有一个任务。只有当所有的任务都完成后,队列4才能开始工作。

因此,带有唯一ID为1234的消息被发送到交换机,交换机将其路由到所有任务队列(Q1、Q2、Q3等)。当消息ID为1234的所有任务完成后,运行消息ID为1234的Q4。

该如何实现呢?

使用Symfony2、RabbitMQBundle和RabbitMQ 3.x。

资源:

更新 #1

好的,我想这就是我要找的:

使用并行处理的RPC,但是我该如何将关联ID设置为我的唯一ID以分组消息并确定是哪个队列?


所以,如果我理解正确的话,你有第四个队列,只有在另外3个队列为空时才能开始处理?如果你正在并行处理大量的事情,那么你的三个队列不是总是在传递信息吗? - afuzzyllama
是的,因为队列总是有新的消息,我忘了提到每个队列中的所有数据都由唯一标识符相关联。因此,交换机将唯一标识符1234发送到Q1、Q2和Q3。每个队列执行不同的任务。在Q4中,我需要知道在我处理Q4中的消息之前,Q1、Q2和Q3中具有唯一标识符1234的消息何时完成。更新了我的问题。 - Phill Pafford
5个回答

7
你需要实现这个功能:http://www.eaipatterns.com/Aggregator.html,但是Symfony的RabbitMQBundle不支持该功能,所以你需要使用底层的php-amqplib。
从bundle中获取的一个普通的消费者回调函数将会得到一个AMQPMessage。从那里,你可以访问通道并手动发布到管道和过滤器模式下的任何交换机。

5
在RabbitMQ网站的RPC教程中,有一种传递“相关ID”的方法,可以将您的消息标识给队列中的用户。
建议在前3个队列中使用某种ID,并使用另一个进程将消息从这3个队列中出列到某些桶中。当这些桶接收到这3个任务的完成(我假设是完成),将最终消息发送到第4个队列进行处理。
如果您向每个用户的队列中发送多个工作项,则可能需要进行一些预处理,以查找特定用户放入队列中的项目数,以便在排队之前,出列过程就能知道要期望多少项。
我使用C#进行我的rabbitmq,所以抱歉我的伪代码不是php风格的。
// Client
byte[] body = new byte[size];
body[0] = uniqueUserId;
body[1] = howManyWorkItems;
body[2] = command;

// Setup your body here

Queue(body)

// Server
// Process queue 1, 2, 3
Dequeue(message)

switch(message.body[2])
{
    // process however you see fit
}

processedMessages[message.body[0]]++;

if(processedMessages[message.body[0]] == message.body[1])
{
    // Send to queue 4
    Queue(newMessage)
}

回复更新 #1

不要认为客户端是一个终端,而应该将其视为服务器上的一个进程。因此,如果您在服务器上设置了一个RPC客户端(例如此类客户端),那么您只需要让服务器处理生成唯一用户ID并将消息发送到适当的队列即可:

    public function call($uniqueUserId, $workItem) {
        $this->response = null;
        $this->corr_id = uniqid();

        $msg = new AMQPMessage(
            serialize(array($uniqueUserId, $workItem)),
            array('correlation_id' => $this->corr_id,
            'reply_to' => $this->callback_queue)
        );

        $this->channel->basic_publish($msg, '', 'rpc_queue');
        while(!$this->response) {
            $this->channel->wait();
        }

        // We assume that in the response we will get our id back
        return deserialize($this->response);
    }


$rpc = new Rpc();

// Get unique user information and work items here

// Pass even more information in here, like what queue to use or you could even loop over this to send all the work items to the queues they need.
$response = rpc->call($uniqueUserId, $workItem);

$responseBuckets[array[0]]++;

// Just like above code that sees if a bucket is full or not

你能再解释一下吗?我理解RPC部分,但你是说要将RPC添加到Q1、Q2和Q3中? - Phill Pafford
不要使用id作为RPC的标识,而是将其用于分组在第四个队列前面等待处理的消息。你可能甚至不需要使用该id。你可以将用户id嵌入到消息体中。 - afuzzyllama
“Group Messages”是什么意思?我大概明白这个概念,但需要更多细节。 - Phill Pafford
我在我的问题中尝试编写一个简单的示例。当然还有很多需要填充的地方,但我认为它展示了我尝试采取的路线? - afuzzyllama
我已经更新了我的问题,我认为我正在寻找具有并行处理的RPC。您介意再看一下吗?+1鼓励。 - Phill Pafford

2
除了我的基于RPC的答案,我想再添加一个基于EIP聚合器模式的答案。
思路如下:一切都是异步的,没有RPC或其他同步操作。每个任务在完成时发送一个事件,聚合器订阅该事件。它基本上计算任务并在计数器达到预期数量(在我们的情况下为3)时发送task4消息。出于简单起见,我选择文件系统作为计数器的存储方式。您可以在那里使用数据库。
生产者看起来更简单。它只需要触发即可。
<?php
use Enqueue\Client\Message;
use Enqueue\Client\ProducerInterface;
use Enqueue\Util\UUID;
use Symfony\Component\DependencyInjection\ContainerInterface;

/** @var ContainerInterface $container */

/** @var ProducerInterface $producer */
$producer = $container->get('enqueue.client.producer');

$message = new Message('the task data');
$message->setCorrelationId(UUID::generate());

$producer->sendCommand('task1', clone $message);
$producer->sendCommand('task2', clone $message);
$producer->sendCommand('task3', clone $message);

任务处理器在完成工作后必须发送一个事件:
<?php
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Client\Message;
use Enqueue\Client\ProducerInterface;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;

class Task1Processor implements PsrProcessor, CommandSubscriberInterface
{
    private $producer;

    public function __construct(ProducerInterface $producer)
    {
        $this->producer = $producer;
    }

    public function process(PsrMessage $message, PsrContext $context)
    {
        // do the job

        // same for other
        $eventMessage = new Message('the event data');
        $eventMessage->setCorrelationId($message->getCorrelationId());

        $this->producer->sendEvent('task_is_done', $eventMessage);

        return self::ACK;
    }

    public static function getSubscribedCommand()
    {
        return 'task1';
    }
}

聚合器处理器:

<?php

use Enqueue\Client\TopicSubscriberInterface;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;
use Symfony\Component\Filesystem\LockHandler;

class AggregatorProcessor implements PsrProcessor, TopicSubscriberInterface
{
    private $producer;
    private $rootDir;

    /**
     * @param ProducerInterface $producer
     * @param string $rootDir
     */
    public function __construct(ProducerInterface $producer, $rootDir)
    {
        $this->producer = $producer;
        $this->rootDir = $rootDir;
    }

    public function process(PsrMessage $message, PsrContext $context)
    {
        $expectedNumberOfTasks = 3;

        if (false == $cId = $message->getCorrelationId()) {
            return self::REJECT;
        }

        try {
            $lockHandler = new LockHandler($cId, $this->rootDir.'/var/tasks');
            $lockHandler->lock(true);

            $currentNumberOfProcessedTasks = 0;
            if (file_exists($this->rootDir.'/var/tasks/'.$cId)) {
                $currentNumberOfProcessedTasks = file_get_contents($this->rootDir.'/var/tasks/'.$cId);

                if ($currentNumberOfProcessedTasks +1 == $expectedNumberOfTasks) {
                    unlink($this->rootDir.'/var/tasks/'.$cId);

                    $this->producer->sendCommand('task4', 'the task data');

                    return self::ACK;
                }
            }

            file_put_contents($this->rootDir.'/var/tasks/'.$cId, ++$currentNumberOfProcessedTasks);

            return self::ACK;
        } finally {
            $lockHandler->release();
        }
    }

    public static function getSubscribedTopics()
    {
        return 'task_is_done';
    }
}

2

我有点不清楚你试图在这里实现什么。但是我可能会稍微修改设计,以便在所有消息从队列中清除后,将其发布到单独的交换机,该交换机将消息发布到第四个队列。


当然,您能指向一些关于此的文档吗?我如何知道所有消息(针对给定的唯一标识符)何时从所有队列中清除? - Phill Pafford
什么是唯一标识符?主题?您可以使用通道接口检查队列是否为空。 - robthewolf
我已经更新了我的问题,我认为我正在寻找具有并行处理的RPC,感谢您的努力。您是否有机会再次查看我的问题? - Phill Pafford
@PhillPafford 我没有看到你对努力的点赞 ;) - Vitalii Zurian

0

我可以向您展示如何使用enqueue-bundle实现它。

因此,请使用composer安装它并像其他bundle一样注册。然后进行配置:

// app/config/config.yml

enqueue:
  transport:
    default: 'amnqp://'
  client: ~

这种方法基于RPC。以下是操作步骤:

<?php
use Enqueue\Client\ProducerInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

/** @var ContainerInterface $container */

/** @var ProducerInterface $producer */
$producer = $container->get('enqueue.client.producer');

$promises = new SplObjectStorage();

$promises->attach($producer->sendCommand('task1', 'the task data', true));
$promises->attach($producer->sendCommand('task2', 'the task data', true));
$promises->attach($producer->sendCommand('task3', 'the task data', true));

while (count($promises)) {
    foreach ($promises as $promise) {
        if ($replyMessage = $promise->receiveNoWait()) {
            // you may want to check the response here
            $promises->detach($promise);
        }
    }
}

$producer->sendCommand('task4', 'the task data');

消费者处理器长这样:
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Consumption\Result;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;

class Task1Processor implements PsrProcessor, CommandSubscriberInterface
{
    public function process(PsrMessage $message, PsrContext $context)
    {
        // do task job

        return Result::reply($context->createMessage('the reply data'));
    }

    public static function getSubscribedCommand()
    {
        // you can simply return 'task1'; if you do not need a custom queue, and you are fine to use what enqueue chooses. 

        return [
          'processorName' => 'task1',
          'queueName' => 'Q1',
          'queueNameHardcoded' => true,
          'exclusive' => true,
        ];
    }
}

将其作为一个标记为enqueue.client.processor的服务添加到您的容器中,并运行命令bin/console enqueue:consume --setup-broker -vvv

这是纯PHP版本.


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接