亚马逊 SQS 的唯一消息

3
我正在使用SQS作为视频编码队列,希望确保每个视频只执行一次编码。
SQS的工作方式很好,因为当消息排队时,它只会被一个线程接收。但是,可能会向队列发送多条消息,用于同一视频/编码,这意味着特定“编码”队列的消息内容将相同。
有没有办法进行去重,以确保对于特定队列,队列中的消息或从队列接收的消息是唯一的?
我想到的一个选项是在发送消息时为每种编码类型创建一个新队列。 因此,队列可以命名为类似于`encoding-video-id`的名称,其中仅有单个消息,并且我可以检查确保该队列尚不存在。唯一的“问题”是可能会创建数千个到数万个这些队列。

那么是什么原因导致您多次将相同的消息加入队列? - Mike Brant
使用案例是用户可以提交“编码”,将视频排队,极端情况下可能会多次触发,这将导致多个消息。 - dzm
刚刚注意到在 SQS 中可以创建“无限制”的队列,所以上面的选项可能可行。 - dzm
即使没有用户排队重复任务的可能性,SQS本身也不能保证消息的“仅一次”传递,而是保证“至少一次”,因此SQS本身可能会传递重复的消息。我认为这些问题的答案与您的问题相关:http://stackoverflow.com/questions/32386877/aws-how-to-guarantee-that-jobs-run-only-once和https://dev59.com/b2Yr5IYBdhLWcg3wq8CI - Mark B
@mbaird 我认为这可能是需要做的事情。基本上使用Redis中的原子操作,并在其上设置较低的TTL(在处理过程中更新)。可以简单地使用基于视频GUID的唯一键进行INCR并检查其是否存在。如果此项的TTL为20秒,而SQS的TTL为1分钟,两者都在每10秒处理一次作业时更新,我认为这应该解决了去重问题,并允许SQS的重试。 - dzm
可能是在SQS队列中使用多个消费者的重复问题。 - Krease
5个回答

3
在我看来,使用单个消息创建无限数量的队列是一个非常糟糕的设计,即使从理论上讲它也可以工作。
如果是我的话,我会尝试确保每个视频都有某种唯一标识符,即使用户“双击”处理按钮也是如此。
我设想了一个系统,其中包含具有唯一名称(例如GUID)的视频上传到S3,将消息放入队列,您的线程从队列中接收消息并进行编码,然后将视频写回到不同的S3存储桶,但具有相同的基本名称。
在处理任何视频之前,我首先会检查“输出存储桶”,以查看是否已经存在与匹配名称相对应的已编码视频,如果有,则跳过重新处理并删除该消息。
如果所有内容都在EC2本地磁盘上运行(而且您没有使用S3),则可以在硬盘上使用输入和输出目录来完成相同的操作(但这假定多台机器不进行处理)。
重要的是要记住,即使用户只提交了一次消息,SQS也可能会传递相同的消息。虽然很少见,但发生了,因此无论您设置什么系统,都需要确保在出现偶尔的重复情况时不会破坏任何内容。

每个视频都有一个唯一的GUID,不幸的是我们不在AWS中,所以一些更理想的工作流程无法使用。但即使有了唯一的GUID,检查编码视频的存在也行不通,因为在视频被编码之前,它可能需要一些时间才能显示出来。理想情况下,应该有一种机制来说“这个视频GUID是否在队列中或正在处理”以原子方式进行。我们肯定可以使用另一个服务或数据库,但那样就不像我想要的那样与SQS紧密耦合,可能会出现误报,我在使用其他队列方法时已经遇到过这种情况。 - dzm

2
无法确保 SQS 队列中消息的唯一性,也无法保证顺序。此外,拥有太多队列并不是一个好主意。
在我看来,您需要向系统添加另一个组件。某种元数据服务就足够了。它可以像这样工作:
- 在创建编码任务时(在将其添加到 SQS 之前),您可以将其写入元数据服务。 - 当工作程序接收到编码任务时,它会查询元数据服务,以查看任务是否已完成。 - 当工作程序完成编码任务时,它会在元数据服务中标记任务为已完成。
如果您正在将这些编码作业的输出上传到 S3,则可以有效地将 S3 本身用作元数据服务。如果每个视频都有唯一的名称/ID,则可以使用此唯一 ID 的键将输出保存在 S3 中。或者将其设置为 S3 元数据键值(这将使文件稍微难以找到,因为您无法仅查询 S3 元数据服务)。然后,当工作程序接收到编码任务时,它会检查文件是否已经存在于 S3 中,如果是,则会从 SQS 中删除该消息并跳过任务。
如果您没有将输出保存到 S3,则可能需要使用某种数据库。Dynamo DB 可能在速度和成本方面都有所帮助。
希望这可以帮到您! :)

这绝对是我考虑过的事情,但是有一个场景,由于各种原因,活动编码可能会失败(服务器故障、编程异常等)。SQS的好处在于我们可以使用可见性超时并在编码过程中更新它(对于长时间编码),但是如果任何无法处理的东西死亡,这条消息现在可以被重新处理,因为它应该。但是,如果我们有另一个服务检查作业的存在,这将防止其被重新处理。 - dzm
一种选择是在文档上使用MongoDB的TTL,并更新TTL,就像更新可见性超时一样。然而,如果它超过了SQS的VT,那么再次接收到的消息将会丢失,因为我们会将这些重复标记为已删除。 - dzm
当然,SQS的可行性超时和更新是为什么它非常适合这种批处理用例的原因。但我并不完全理解你的第一个评论。您不会阻止任何东西被重新处理,当然也不需要一个完整的“其他服务”来检查作业的存在。您只需在收到任务时立即检查您的元服务。如果成功执行了任务,您只会将任务标记为已完成。因此,没有任何东西可以阻止未能完成的任务被重新处理。 - mickzer
我认为这并不能解决防止多条消息被处理的问题。如果我们只是检查任务是否已完成,那么仍然有可能同时运行多个任务。 - dzm
1
哦,我明白了,你想防止多个工作人员同时运行相同的任务。那么你肯定需要使用类似 Dynamo 的东西。当每个工作人员收到一条消息时,他们会使用唯一标识符更新 Dynamo,以表示该任务已经在处理中。然后,当另一个工作人员出现并接收到相同任务的消息时,它会使用唯一标识符查询 Dynamo,并查看它是否已经在处理中。可以将其视为锁定机制。 - mickzer

2

SQS具有去重ID属性。在5分钟的时间窗口内发送具有相同去重ID的消息将被成功接收,但实际上不会添加到队列中。

您可以使用此功能来防止同一视频的额外排队。

即使消息已经被处理,如果具有相同去重ID的其他消息在时间窗口内发送,也不会被排队。同样地,如果您在时间窗口过后再次发送相同的ID,则该消息将重新排队,这也可能是不希望发生的。

然而,与其维护自己的排队视频缓冲区,使用去重ID应该可以满足您的要求。


1
你提出的解决方案是一个糟糕的设计,无论它是否可行。以下是我解决问题的方法。
我将使用数据库(可能是DynamoDB)来存储基于视频编码类型的唯一ID,并添加一个名为“状态”的列。当用户点击转换按钮时,首先我会检查数据库。如果项目不可用,则会将新记录推送到具有状态“正在转换” 的数据库中。然后将工作推入SQS。处理工作负载后,将数据库的状态更改为“已完成”。如果用户再次点击转换按钮,则根据数据库中的状态变量显示结果。

如果作业失败,SQS 需要重试会发生什么?当再次收到新消息时,Dynamo 的状态将为“转换中”。 - dzm
这可以由队列工作者处理。队列工作者将从队列中获取任务并开始转换。如果发生了什么问题,你只需处理异常。由于你还没有将工作删除,所以它仍然在队列中。因此,队列工作者会尝试再次执行相同的工作,直到成功为止。成功完成工作后,您可以从队列中删除消息并更新数据库。但要注意队列的可见超时时间,以避免重复工作。 - Madura Pradeep
我不知道你的转换过程是如何工作的,也不知道为什么你不能处理异常。我的建议是更符合架构规范的方式。即使你没有处理异常,除非你从队列中删除你的工作,否则只会有一个项目存在,对吧?所以你可以利用这个优势而不会有任何麻烦。 - Madura Pradeep
Amazon SQS具有与分布式多工作进程架构一起工作的能力。我建议您查看它是如何工作的。http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/Welcome.html 特别是请检查上述链接中的“多个写入器和读取器”部分。 - Madura Pradeep
如果这个回答解决了你的问题,请标记为答案。 - Madura Pradeep
显示剩余2条评论

1
有一种方法可以在从队列接收数据后仅检查唯一消息。我将在下面解释相同的内容。
假设您经常向单个SQS队列添加随机消息(无论任何ID或任何内容)。逻辑是在从队列接收消息时。
在创建ReceiveMessageRequest对象时,您可以指定AttributeNames。因此,将“ApproximateReceiveCount”属性添加到请求对象中。这将获取与从SQS队列获取的每条消息一起获取的“ApproximateReceiveCount”值。
现在,对于第一次读取的消息,“ApproximateReceiveCount”为1。否则,该值将大于1。因此,每次进行SQS读取时,您只能考虑那些消息。通过将请求对象的“MaxNumberOfMessages”属性设置为限制每次读取的最大消息数,以确保您不会在每次读取时获得巨大的负载(每个64 KB的有效负载块都将计费为1个请求)。
我知道,FIFO队列在某些情况下会做得更好。但是,它有一些限制-
  • 它的吞吐量有限(仅为每秒300个交易(TPS))
  • 目前仅支持两个地区(美国西部(俄勒冈州)和美国东部(俄亥俄州)地区)

请在下面找到C#代码,解释其逻辑-

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Amazon.SQS;
using Amazon.SQS.Model;

namespace DriverDataPooler1
{
    class Program
    {
        AmazonSQSClient objClient = new AmazonSQSClient
                ("<AWSAccessKeyId>", "<AWSSecretAccessKey>", Amazon.RegionEndpoint.APSouth1);
        //Create New SQS Queue
        CreateQueueResponse queueResponse = new CreateQueueResponse();
        ListQueuesResponse objqueuesResponseList = new ListQueuesResponse();

        // Declare the request and response objects
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
        ReceiveMessageResponse receiveMessageResponse = new ReceiveMessageResponse();

        static void Main(string[] args)
        {
            Program p1 = new Program();
            p1.getQueueData();
        }

        public void getQueueData(){

            objqueuesResponseList = objClient.ListQueues(new ListQueuesRequest());
            List<String> QueueList = objqueuesResponseList.QueueUrls;



            // Receive Message from SQS Queue
            if (QueueList.Any())
            {
                // I am only considering the first queue here as I have only one SQS queue
                receiveMessageRequest.QueueUrl = QueueList[0];
                receiveMessageRequest.WaitTimeSeconds = 20;

                //You can limit t6he number of messages to decrease the mayload amount (depends on the size of each message) 
                receiveMessageRequest.MaxNumberOfMessages = 10;
                receiveMessageRequest.AttributeNames = new List<string>() { "ApproximateReceiveCount" };
                receiveMessageResponse = objClient.ReceiveMessage(receiveMessageRequest);
                List<Message> result = receiveMessageResponse.Messages;
                if (result.Any())
                {
                    foreach (Message res in result)
                    {
                        // Checking for the messages that are read for the first time
                        if (Int16.Parse(res.Attributes["ApproximateReceiveCount"]) == 1)

                            // Process you messages here 
                            Console.WriteLine(res.Body);
                    }
                }
                else
                {
                    Console.WriteLine("You have no new messages in your SQS");
                }
            }
            else
            {
                Console.WriteLine("You have no available SQS");
            }
            Console.ReadKey();

        }
    }
}

如果您有任何进一步的查询,请评论。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接