如何在Laravel队列中使用Amazon FIFO SQS?

12

亚马逊宣布了他们的新FIFO SQS服务,我想在Laravel队列中使用它来解决一些并发问题。

我创建了几个新队列并更改了配置。但是,我遇到了一个MissingParameter错误,上面写着:

The request must contain the parameter MessageGroupId.

所以我修改了文件vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php

public function pushRaw($payload, $queue = null, array $options = [])
{
    $response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
        'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))]);

    return $response->get('MessageId');
}

public function later($delay, $job, $data = '', $queue = null)
{
    $payload = $this->createPayload($job, $data);

    $delay = $this->getSeconds($delay);

    return $this->sqs->sendMessage([
        'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
        'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))
    ])->get('MessageId');
}

我正在使用APP_ENV作为群组ID(它是一个单一的消息队列,所以实际上这并不重要。我只想让所有的东西都是先进先出)。

但我仍然得到相同的错误信息。我该如何修复它?任何帮助将不胜感激。

(顺便说一句,SDK中定义了sendMessage吗?我可以找到它的存根,但我没有找到详细的实现)

3个回答

23
我想指出给那些可能会遇到同样问题的人,尽管编辑SqsQueue.php可行,但它很容易被composer installcomposer update重置。另一个选择是为SQS FIFO实现一个新的Illuminate\Queue\Connectors\ConnectorInterface,然后将其添加到Laravel的队列管理器中。 我的方法如下:
  1. 创建一个新的SqsFifoQueue类,它继承Illuminate\Queue\SqsQueue,但支持SQS FIFO。
  2. 创建一个新的SqsFifoConnector类,它继承Illuminate\Queue\Connectors\SqsConnector,它将使用SqsFifoQueue建立连接。
  3. 创建一个新的SqsFifoServiceProvider,将SqsFifoConnector注册到Laravel的队列管理器中。
  4. SqsFifoServiceProvider添加到您的config/app.php文件中。
  5. 更新config/queue.php文件以使用新的SQS FIFO队列驱动程序。
示例:
  1. 创建一个新的SqsFifoQueue类,它继承Illuminate\Queue\SqsQueue,但支持SQS FIFO。
<?php

class SqsFifoQueue extends \Illuminate\Queue\SqsQueue
{
    public function pushRaw($payload, $queue = null, array $options = [])
    {
        $response = $this->sqs->sendMessage([
            'QueueUrl' => $this->getQueue($queue),
            'MessageBody' => $payload,
            'MessageGroupId' => uniqid(),
            'MessageDeduplicationId' => uniqid(),
        ]);

        return $response->get('MessageId');
    }
}
创建一个新的 SqsFifoConnector 类,继承 Illuminate\Queue\Connectors\SqsConnector,使用 SqsFifoQueue 建立连接。
<?php

use Aws\Sqs\SqsClient;
use Illuminate\Support\Arr;

class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector
{
    public function connect(array $config)
    {
        $config = $this->getDefaultConfiguration($config);

        if ($config['key'] && $config['secret']) {
            $config['credentials'] = Arr::only($config, ['key', 'secret']);
        }

        return new SqsFifoQueue(
            new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '')
        );
    }
}
  • 创建一个新的SqsFifoServiceProvider,将SqsFifoConnector注册到Laravel队列管理器中。

  • <?php
    
    class SqsFifoServiceProvider extends \Illuminate\Support\ServiceProvider
    {
        public function register()
        {
            $this->app->afterResolving('queue', function ($manager) {
                $manager->addConnector('sqsfifo', function () {
                    return new SqsFifoConnector;
                });
            });
        }
    }
    
  • SqsFifoServiceProvider添加到您的config/app.php中。

  • <?php
    
    return [
        'providers'     => [
            ...
            SqsFifoServiceProvider::class,
        ],
    ];
    
  • 更新config/queue.php文件以使用新的SQS FIFO队列驱动程序。

  • <?php
    
    return [
    
        'default' => 'sqsfifo',
    
        'connections' => [
            'sqsfifo' => [
                'driver' => 'sqsfifo',
                'key'    => 'my_key'
                'secret' => 'my_secret',
                'queue'  => 'my_queue_url',
                'region' => 'my_sqs_region',
            ],
        ],
    ];
    

    此时,您的队列应支持 SQS FIFO 队列。

    无耻地自我推销:在上述步骤中,我创建了一个laravel-sqs-fifo composer包来处理此问题,在https://github.com/maqe/laravel-sqs-fifo


    1
    我不得不将我的前缀添加到 config/queue.php 中的 sqsfifo 连接中,但除此之外,这个功能非常完美! - Ben Poulson
    4
    “MessageGroupId” => uniqid() 这个操作会破坏 FIFO(先进先出) 队列的保证。为什么不在这种情况下使用普通队列呢?详见:https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html - Greg
    1
    @Greg,另一个更成熟且积极维护的Laravel SQS FIFO队列包是shiftonelabs/laravel-sqs-fifo-queue。希望这可以帮到你。 - alexkb
    @Greg 我看到的区别(除非我误解了文档)是,即使在单个生产者设置中,它仍然确保消息只被处理一次。对于我的用例,顺序并不重要,但标准队列不能保证消息不会被多次处理。因此,这种方法对我应该有效。希望我是正确的! - Dylan Buth

    3

    FIFO消息队列与标准AWS SQS队列的工作方式不同。

    处理FIFO队列需要单独的驱动程序。

    我曾面临同样的情况,下面这个软件包是我的救星。

    https://packagist.org/packages/shiftonelabs/laravel-sqs-fifo-queue

    queue.php中进行配置。

    'sqs-fifo' => [
                'driver' => 'sqs-fifo',
                'key' => env('SQS_KEY'),
                'secret' => env('SQS_SECRET'),
                'prefix' => env('SQS_PREFIX'),
                'queue' => env('SQS_QUEUE'),
                'region' => env('SQS_REGION'),
                'group' => 'default',
                'deduplicator' => 'unique',
            ],
    

    那么
    dispatch(new TestJob([]))->onQueue('My_Mail_Queue.fifo');
    

    注意: 你需要在你的应用程序中,在.env文件中指定默认队列名称。

    SQS_QUEUE=My_Default_queue.fifo
    

    此外,在监听器中,您需要指定应用程序中将使用的所有队列名称。(如果您在整个应用程序中使用相同的队列名称,则无需在监听器中指定队列名称。)
    php artisan queue:listen --queue=My_Default_queue.fifo,My_Mail_Queue.fifo,My_Message_Queue.fifo
    

    0
    除了需要 MessageGroupId 之外,它还需要一个 MessageDeduplicationId 或启用基于内容的去重。

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接