从AWS Lambda读取SQS队列

13
我有以下基础设施:
我有一个在端口上监听消息的NodeJS+Express进程(进程1)的EC2实例。每次该进程接收到一条消息,它都会将其发送到SQS队列。然后我在同一台机器上拥有另一个进程使用长轮询方式读取队列(进程2)。当它在队列中找到一条消息时,它会将数据插入到位于RDS实例上的MariaDB数据库中。
仅为澄清,消息由用户生成,他们发送包含任意信息的数据块到进程1所在的端点
现在我想将读取SQS的进程(进程2)放入Lambda函数中,以便写入队列和读取队列的进程完全独立。问题是我不知道这是否可能。
我知道Lambda函数是响应事件而调用的,目前支持的事件类型有S3、SNS、SES、DynamoDB、Kinesis、Cognito、CloudWatch和Cloudformation,但不支持SQS
我想使用SNS通知来调用Lambda函数,以便每次将消息推送到队列时,都会触发SNS通知并调用Lambda函数,但是在尝试一番后,我意识到无法从SQS创建SNS通知,只能将SNS通知写入队列。
现在我有点困惑,因为我不知道该如何继续。由于AWS服务的当前限制,我感觉无法创建此基础架构。是否有其他方法可以实现我的目标,或者我已经走到了死胡同?
仅为了扩展我的问题,这个github存储库展示了如何从Lambda函数中读取SQS队列,但是lambda函数仅在从命令行触发时起作用:

https://github.com/robinjmurphy/sqs-to-lambda

在自述文件中,作者提到了以下内容:
更新:Lambda现在支持SNS通知作为事件源,这使得对于SNS通知来说,这个hack变得完全不必要。如果你喜欢使用Lambda函数来处理SQS队列上的作业,你可能仍然会发现它有用。
但我认为这并不能解决我的问题,SNS通知可以调用Lambda函数,但我不知道如何在接收到SQS队列中的消息时创建通知。
谢谢

1
有两件事可以帮你解决问题:(1)Lambda 可以监听 SNS。如果不是你想要的,那么(2)将 SQS 队列设为 SNS 主题的一个订阅者 [每个 SNS 消息都会被写入 SQS 队列]。 - Naveen Vijay
我认为这里是我开始感到困惑的地方。我不想将SNS消息写入队列。进入队列的消息是由用户生成的(他们向URL发布数据,我的nodejs线程处理请求,格式化数据并将其发送到SQS队列)。然后,我想要做的是,每次插入用户消息到队列中时,以某种方式触发SNS通知来调用Lambda函数(实际上,是通过使Lambda函数监听SNS来完成的)。 - mIwE
2
不要将Lambda、SQS和SNS之间的关系联系起来,我建议考虑安排Lambda函数查看队列并在存在项目时处理它们。另一种选择是使用2个Lambda函数-一个从SQS中读取项目[已安排]并将项目推送到SNS,然后由另一个处理Lambda函数处理。 - Naveen Vijay
这很有道理。实际上,这个解决方案类似于我们目前使用cronjobs每隔几分钟或几秒钟检查队列的系统。 我也在考虑使用CloudWatch来检查队列统计信息,并在有任何消息时触发lambda函数。 感谢您的建议。我会进一步调查。 - mIwE
很高兴能够帮到你。我会把它写成一个答案。 - Naveen Vijay
5个回答

10

有几种策略可以用来连接各种组件: (A)同步或者 Run-Sleep-Run的方式来保持SNS、SQS和Lambda之间的数据流。

策略1:使用一个Lambda函数实时监听SNS并处理它 [请注意,SQS队列可以订阅SNS主题 - 这可能有助于日志记录/审计/重试处理]

策略2:假设您正在获取来源于SQS队列的数据。您可以尝试使用两个Lambda函数[Feeder和Worker]。

Feeder将是定期执行的Lambda函数,其工作是从SQS中获取项目(如果有的话)并将其推送为一个SNS主题(并继续永久执行)

Worker将与监听SNS主题相关联,它将进行实际的数据处理


1
有没有任何示例代码可以展示这两种策略中的任何一种? - astone26
策略2的好例子可以在这里找到:https://cloudonaut.io/integrate-sqs-and-lambda-serverless-architecture-for-asynchronous-workloads/ - jjanczyszyn

4

1
很好的发现。这里提供了一个教程:https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-lambda-function-trigger.html - Keet Sugathadasa

2
AWS SQS是亚马逊最古老的产品之一,直到2018年6月仅支持轮询(长和短)。正如此答案中提到的那样,AWS SQS现在支持触发Lambda函数以响应SQS新消息的到达。有关此功能的完整教程可在此文档中找到。
我曾经使用不同的机制来解决这个问题,以下是您可以使用的一些方法。
  1. 您可以在Lambda中开发一个简单的轮询应用程序,并使用AWS CloudWatch每5分钟或更长时间调用它。您可以使用CloudWatch事件使lambda实现短暂的停机时间,从而使其接近实时。使用此教程此教程来实现此目的。(这可能会在Lambdas上花费更多)
  2. 如果您不需要持久化消息或保证传递顺序,则可以考虑SQS是多余的。您可以使用AWS SNS(Simple Notification Service)直接调用lambda函数并执行所需的处理。使用此教程来实现此目的。这将实时发生。但是,主要缺点是每个区域可以在给定时间内启动的lambda数量。在遵循此方法之前,请阅读此文档并了解限制。尽管如此,AWS SNS保证传递顺序。此外,SNS可以直接调用HTTP端点并将消息存储在您的数据库中。

1

我曾经遇到过类似的情况(现在已经有了可用的解决方案)。我是通过以下方式解决的:

enter image description here

即将发布事件到SNS,然后扇出到Lambda和SQS。
注意:这不适用于必须按特定顺序处理的事件。
其中有一些需要注意的问题(可能有解决方案),例如:
  • 竞态条件:在将消息存入队列之前,lambda可能会被调用
  • SQS队列的分布式特性可能导致返回零条消息,即使有一条消息note1。
解决这两种情况的方法是对SQS队列进行长轮询;但这会使您的Lambda账单更昂贵。
注1

短轮询是默认行为,即在ReceiveMessage调用时对一组加权随机的机器进行抽样。这意味着仅返回抽样机器上的消息。如果队列中的消息数量很少(小于1000),则您每次ReceiveMessage调用可能会收到比请求的消息数量少的消息。如果队列中的消息数量极少,则可能在特定的ReceiveMessage响应中不会收到任何消息;此时应重复请求。 http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html


0

你能否简要概括一下你提供的链接中的信息?虽然你提供了链接很好,但如果链接失效,那么答案从那时起就变得无用了。谢谢! - roelofs
当然,基本上我们开源了一个项目,可以在Github上找到:https://github.com/iopipe/sqs-to-lambda-async。这使您能够通过SQS异步触发Lambda函数。 - adjohn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接