从AWS SQS队列中轮询并删除已接收消息的最佳实践是什么?

27

我有一个SQS队列,它不断地被数据消费者填充,现在我正在尝试使用Python的boto创建从SQS中拉取这些数据的服务。

我的设计方式是我将有10-20个线程都尝试从SQS队列中读取消息,然后在对数据进行操作(业务逻辑)之前做他们需要做的事情,完成后返回到队列获取下一批数据。如果没有数据,它们将等待直到有可用数据。

我有两个方面不确定的地方:

  1. 我是否只需使用较长的time_out值调用receive_message()方法,如果在20秒(最大允许时间)内没有返回任何内容,则重试?还是有一种阻塞的方法只返回一次数据可用?
  2. 我注意到一旦我收到一条消息,它就不会从队列中删除,我是否需要接收一条消息,然后在接收到消息后发送另一个请求将其从队列中删除?这似乎有点过度处理了。

谢谢

3个回答

29

receive_message()方法的长轮询功能是轮询SQS最有效的方式。如果它没有返回任何消息,我建议在重试之前等待一小段时间,特别是如果你有多个读取器。你甚至可以进行增量延迟,使得每个后续的空读取等待更长的时间,这样你就不会被AWS限制。

是的,在读取完消息后你必须删除它,否则它将重新出现在队列中。在工作者读取消息并在无法完全处理消息之前失败的情况下,这实际上非常有用。在这种情况下,它将被重新排队并由另一个工作者读取。您还需要确保消息的不可见超时设置足够长,以便工作者有足够的时间在消息自动重新出现在队列上之前处理消息。如果有必要,您的工作者可以在处理过程中调整超时时间,以防它花费的时间比预期要长。


12
如果您想简单地设置一个包含自动删除已处理消息和将异常自动推送到指定队列的侦听器,可以使用pySqsListener包。
您可以像这样设置一个侦听器:
from sqs_listener import SqsListener

class MyListener(SqsListener):
    def handle_message(self, body, attributes, messages_attributes):
        run_my_function(body['param1'], body['param2']

listener = MyListener('my-message-queue', 'my-error-queue')
listener.listen()

有一个开关可以从短轮询切换到长轮询——这在 README 文件中都有详细说明。

免责声明:我是该软件包的作者。


7
另一种选择是使用AWS Beanstalk设置工作应用程序,如此博客文章所述。
您的flask应用程序不再使用boto3进行长轮询,而是以HTTP post中的json对象形式接收消息。 AWS Elastic Beanstalk配置选项卡中可配置HTTP路径和设置的消息类型: enter image description here AWS Elastic Beanstalk可以根据SQS队列的大小动态扩展工作人员数量,同时具有部署管理优势。 这个是我发现有用的模板示例应用程序。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接