如何在Python中使用boto库获取Amazon SQS中消息的接收计数?

9
我正在使用Python的boto库获取Amazon SQS消息。在异常情况下,我不会从队列中删除消息,以便为恢复临时故障提供更多机会。但是,我不想不断接收失败的消息。我想要做的是,在接收3次以上后删除消息,或者如果接收计数超过3,则不获取消息。
最优雅的方法是什么?
6个回答

7
至少有几种方法可以实现这个目标。
当您在boto中阅读消息时,您会收到一个Message对象或其某个子类。Message对象具有“attributes”字段,该字段是包含SQS已知的所有消息属性的字典。 SQS跟踪的其中一件事是消息被读取的次数的近似值。 因此,您可以使用此值来确定是否应删除消息,但您必须对该值的“近似”性感到满意。
或者,您可以在某种类型的数据库中记录消息ID,并在每次读取消息时递增数据库中的计数字段。如果消息始终在单个进程中读取,则可以在简单的Python字典中完成此操作,如果需要记录跨进程的读取,则可以在SimpleDB之类的东西中完成此操作。
希望这可以帮助您。
以下是一些示例代码:
>>> import boto.sqs
>>> c = boto.sqs.connect_to_region()
>>> q = c.lookup('myqueue')
>>> messages = c.receive_message(q, num_messages=1, attributes='All')
>>> messages[0].attributes
{u'ApproximateFirstReceiveTimestamp': u'1365474374620',
 u'ApproximateReceiveCount': u'2',
 u'SenderId': u'419278470775',
 u'SentTimestamp': u'1365474360357'}
>>>

谢谢您的回复。我已经浏览了Boto文档,但是没有找到获取消息接收计数的线索。您知道在哪里可以找到吗?顺便说一句,拥有一个简单的本地数据库是我管理这个问题的替代方法,但首先我想保持简单——如果可能的话,就像获取接收计数一样。 - huzeyfe
你能添加一个语法示例吗?当我多次读取一条消息并查看message_instance.attributes时,我只发现一个空字典。 - tponthieux

4
AWS内置了该功能,只需按照以下步骤操作即可:
  1. 创建一个死信队列
  2. 勾选“使用Redrive策略”,为源队列启用Redrive策略
  3. 选择您在第一步中创建的死信队列作为“死信队列”
  4. 将“最大接收次数”设置为“3”或任何介于1和1000之间的值
其工作原理是,每当工作者接收到一条消息时,接收计数会增加。一旦它达到“最大接收次数”的计数,该消息就会被推送到死信队列中。请注意,即使您通过AWS控制台访问消息,接收计数也会增加。
来源:使用Amazon SQS死信队列

4
另一种方法是在SQS队列中的消息末尾放置一个额外的标识符。此标识符可保持读取消息的次数计数。
另外,如果您不希望服务反复轮询这些消息,则可以创建另一个队列,称为“死信队列”,并将已超过阈值的消息转移到该队列中。

2

从您读取的消息中获取ApproximateReceiveCount属性。 将其移动到另一个队列(这样您就可以管理错误消息)或仅将其删除。

foreach (var message in response.Messages){
       try{
           var notifyMessage = JsonConvert.DeserializeObject<NotificationMessage>(message.Body);
                    Global.Sqs.DeleteMessageFromQ(message.ReceiptHandle);
           }
       catch (Exception ex){
           var  receiveMessageCount = int.Parse(message.Attributes["ApproximateReceiveCount"]);
           if (receiveMessageCount >3 ) 
              Global.Sqs.DeleteMessageFromQ(message.ReceiptHandle);
            }
        }

0

应该可以分几个步骤完成。

  1. 创建 SQS 连接: sqsconnrec = SQSConnection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
  2. 创建队列对象: request_q = sqsconnrec.create_queue("queue_Name")
  3. 加载队列消息: messages= request_q.get_messages()
  4. 现在您会得到消息对象的数组,并且要查找消息总数: 只需执行 len(messages)

应该能够完美地运作。


谢谢关注,但恐怕这不是我的情况。我不需要 len(messages),我需要某个消息的接收计数。 - huzeyfe
1
是的,在boto中没有定义函数来获取接收计数。我建议您维护一个本地元组,其中包含消息ID、计数和消息正文。在一定时间后从队列加载消息,检查元组中是否存在ID,如果是,则增加接收计数;如果接收计数>3,则可以删除该消息。我查看了消息对象的dir,它没有一个接收计数功能。希望这有助于解决您的问题。 - Avichal Badaya
这是我猜测的备选方案,现在我只有这个选择。非常感谢。 - huzeyfe
2
几点注释: #1. 使用元组来跟踪这个问题是一个可怕的方法,应该使用 collection.Counter,它是专门为解决这个问题而设计的字典。 #2. 维护任何类型的本地计数意味着无法以分布式方式操作,而这正是 SQS 促进的行为。 #3. 在许多情况下,队列可能太大,无法加载所有消息,您肯定也不希望单个工作人员接收所有消息。 - Endophage

0
如果你需要在比较“ReceiveCount”之后添加一些逻辑,你可以在2023年使用这个片段。
        sqs = boto3.client('sqs')
        response = sqs.receive_message(
            QueueUrl='queue_url',
            MaxNumberOfMessages=10,
            VisibilityTimeout=30,
            WaitTimeSeconds=10,
            AttributeNames=['ApproximateReceiveCount']
        )

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接