如何使用Python中的boto库获取Amazon SQS队列中的所有消息?

27

我正在开发一个应用程序,其工作流由使用boto在SQS中传递消息进行管理。

我的SQS队列正在逐渐增长,而我没有办法检查它应该包含多少个元素。

现在我有一个守护进程定期轮询队列,并检查是否有一组固定大小的元素。例如,考虑以下“队列”:

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"]
现在,我想检查一下队列中是否同时存在"msg1_comp1"、"msg2_comp1"和"msg3_comp1",但我不知道队列的大小。
通过查看API,似乎您只能获取队列中的一个元素或固定数量的元素,而不能获取所有元素。
>>> rs = q.get_messages()
>>> len(rs)
1
>>> rs = q.get_messages(10)
>>> len(rs)
10

回答中提出的一个建议是使用循环获取例如10个消息,直到我没有得到任何返回,但是SQS中的消息具有可见性超时,这意味着如果我从队列中轮询元素,它们不会真正被删除,它们只会在短时间内变为不可见。

是否有一种简单的方法可以获取队列中的所有消息,而不知道有多少个消息?

6个回答

29
我一直在使用AWS SQS队列来提供即时通知,因此需要实时处理所有消息。以下代码将帮助您高效地出列(所有)消息并在删除时处理任何错误。
注意:要从队列中删除消息,您需要将它们删除。我正在使用更新的boto3 AWS Python SDK、json库和以下默认值:
import boto3
import json

region_name = 'us-east-1'
queue_name = 'example-queue-12345'
max_queue_messages = 10
message_bodies = []
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>'
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>'
sqs = boto3.resource('sqs', region_name=region_name,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key)
queue = sqs.get_queue_by_name(QueueName=queue_name)
while True:
    messages_to_delete = []
    for message in queue.receive_messages(
            MaxNumberOfMessages=max_queue_messages):
        # process message body
        body = json.loads(message.body)
        message_bodies.append(body)
        # add message to delete
        messages_to_delete.append({
            'Id': message.message_id,
            'ReceiptHandle': message.receipt_handle
        })

    # if you don't receive any notifications the
    # messages_to_delete list will be empty
    if len(messages_to_delete) == 0:
        break
    # delete messages to remove them from SQS queue
    # handle any errors
    else:
        delete_response = queue.delete_messages(
                Entries=messages_to_delete)

这里提供了一个适用于v2 Boto包的改编版本,以“回溯”从Boto3中导入的delete_messages函数。内置的Boto(2)delete_message_batch有10条消息的限制,并且需要完整的Message类对象,而不仅仅是对象中的IDReceiptHandles - mpag
谢谢您的回答,它对解决这个问题很有帮助 https://stackoverflow.com/questions/62681836/re-process-dlq-events-in-lambda/63206442 - Dos
我认为OP不需要立即从队列中删除消息。 - Memphis Meng

26

将你的调用q.get_messages(n)放在while循环中:

all_messages=[]
rs=q.get_messages(10)
while len(rs)>0:
    all_messages.extend(rs)
    rs=q.get_messages(10)

此外,dump也不支持超过10条消息

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
    """Utility function to dump the messages in a queue to a file
    NOTE: Page size must be < 10 else SQS errors"""

@linker - 你说你需要检查特定的消息。 这是否意味着有某些匹配条件,您正在将每个消息进行比较? - AJ.
@linker - 根据参考资料,可见超时时间最长可达12小时。除非您正在启动大规模的EC2作业,否则我猜这应该能满足您的需求?http://docs.amazonwebservices.com/AWSSimpleQueueService/2011-10-01/APIReference/Query_QueryReceiveMessage.html - AJ.
2
@linker - 顺便说一下,消息数量只应该是1到10。如果您使用其他数字,SQS服务应该会返回一个“ReadCountOutOfRange”错误。 - AJ.
即使2.1.1版本也有这些注释,很奇怪,我怀疑这是一些遗留的注释,因为除此之外,它即使在处理更多的消息时也能正常工作。 - Charles Menguy
确实很有趣。我会选择这段代码,它看起来与dump()函数非常相似,谢谢! - Charles Menguy
显示剩余6条评论

8
我的理解是SQS服务的分布式特性使您的设计基本上不可行。每次调用get_messages时,您都在与一组不同的服务器交流,这些服务器将具有一些但不是所有的消息。因此,不可能“不时地签到”以设置是否准备好特定组消息,然后只接受那些消息。
您需要做的是连续轮询,随时接收所有消息并将其存储在自己的数据结构中。在每次成功获取后,您可以检查自己的数据结构,看是否已收集完整的消息组。
请记住,消息将以无序方式到达,某些消息将被传递两次,并且删除操作必须传播到所有SQS服务器,但随后的获取请求有时会击败删除操作。

4

我会在cronjob中执行此操作。

from django.core.mail import EmailMessage
from django.conf import settings
import boto3
import json

sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
         aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
         region_name=settings.AWS_REGION)

queue = sqs.get_queue_by_name(QueueName='email')
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)

while len(messages) > 0:
    for message in messages:
        mail_body = json.loads(message.body)
        print("E-mail sent to: %s" % mail_body['to'])
        email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']])
        email.send()
        message.delete()

    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)

0

类似下面的代码应该可以解决问题。抱歉它是用C#编写的,但转换为Python不应该很难。字典用于筛选重复项。

    public Dictionary<string, Message> GetAllMessages(int pollSeconds)
    {
        var msgs = new Dictionary<string, Message>();
        var end = DateTime.Now.AddSeconds(pollSeconds);

        while (DateTime.Now <= end)
        {
            var request = new ReceiveMessageRequest(Url);
            request.MaxNumberOfMessages = 10;

            var response = GetClient().ReceiveMessage(request);

            foreach (var msg in response.Messages)
            {
                if (!msgs.ContainsKey(msg.MessageId))
                {
                    msgs.Add(msg.MessageId, msg);
                }
            }
        }

        return msgs;
    }

0

注意:这不是对问题的直接回答。 相反,它是@TimothyLiu's answer的补充,假设最终用户正在使用Boto包(又名Boto2),而不是Boto3。此代码是他的答案中引用的delete_messages调用的“Boto-2-化”。


Boto(2)调用delete_message_batch(messages_to_delete),其中messages_to_delete是一个dict对象,其键值对应于idreceipt_handle对,返回

AttributeError:“dict”对象没有“id”属性。

看起来 delete_message_batch 希望一个 Message 类对象; 如果你复制 delete_message_batch 的 Boto 源代码 并允许其使用非-Message 对象(比如 boto3),如果一次删除超过 10 条 "消息",也会失败。因此,我不得不使用以下解决方法。

这里 打印代码

from __future__ import print_function
import sys
from itertools import islice

def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

@static_vars(counter=0)
def take(n, iterable, reset=False):
    "Return next n items of the iterable as same type"
    if reset: take.counter = 0
    take.counter += n
    bob = islice(iterable, take.counter-n, take.counter)
    if isinstance(iterable, dict): return dict(bob)
    elif isinstance(iterable, list): return list(bob)
    elif isinstance(iterable, tuple): return tuple(bob)
    elif isinstance(iterable, set): return set(bob)
    elif isinstance(iterable, file): return file(bob)
    else: return bob

def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False
  """
  Deletes a list of messages from a queue in a single request.
  :param cx: A boto connection object.
  :param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted
  :param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects.
  """
  listof10s = []
  asSuc, asErr, acS, acE = "","",0,0
  res = []
  it = tuple(enumerate(messages))
  params = {}
  tenmsg = take(10,it,True)
  while len(tenmsg)>0:
    listof10s.append(tenmsg)
    tenmsg = take(10,it)
  while len(listof10s)>0:
    tenmsg = listof10s.pop()
    params.clear()
    for i, msg in tenmsg: #enumerate(tenmsg):
      prefix = 'DeleteMessageBatchRequestEntry'
      numb = (i%10)+1
      p_name = '%s.%i.Id' % (prefix, numb)
      params[p_name] = msg.get('id')
      p_name = '%s.%i.ReceiptHandle' % (prefix, numb)
      params[p_name] = msg.get('receipt_handle')
    try:
      go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST')
      (sSuc,cS),(sErr,cE) = tup_result_messages(go)
      if cS:
        asSuc += ","+sSuc
        acS += cS
      if cE:
        asErr += ","+sErr
        acE += cE
    except cx.ResponseError:
      eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
    except:
      eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
  return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res

def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0):
  if sSuc == "": sSuc="None"
  if sErr == "": sErr="None"
  if cS == expect: sSuc="All"
  if cE == expect: sErr="All"
  return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接