AWS Lambda 函数通过 SQS 触发器被调用时,未能尊重我在函数内手动设置的可见性超时时间。

3
我正在实现自己的Webhooks服务,该服务将向已订阅的Webhooks发送事件。
架构概述:
- 事件被推送到SQS队列。 - SQS消息触发Lambda函数(事件源映射)。 - 对于每个事件,我都会向订阅的Webhooks发出传出HTTP请求。 - 非2xx响应必须进行指数退避重试(在这种情况下,我更改接收到的消息上的消息可见性)。 - 由于由SQS调用的Lambda函数会在完成后自动删除消息,因此我在函数末尾抛出错误以防止自动删除。
据我所知,更改消息可见性的调用是成功的。我想知道是否还有其他内置于SQS调用的Lambda函数中的内容。当Lambda函数失败时,它是否在内部再次更改消息可见性?或者由SQS调用的Lambda是否不尊重消息可见性更改(这对我来说真的毫无意义)。如果有人对此问题有任何见解,请告诉我。我很惊讶地发现Lambda在成功后自动删除消息,因为它让我的特定用例感觉有点笨拙 - 抛出错误以使Lambda函数失败以防止消息被删除。
提前致谢!
3个回答

2
SQS与Lambda的集成方式是由集成控制消息轮询。用于确定消息是否应删除的机制是Lambda的响应不是错误。虽然在文档中没有明确说明,但我认为当出现错误时,集成将把可见性超时设置为零,以便立即让另一个进程接收它。因此,在您的示例中,您将其设置为允许重试的某个数字,但是当返回错误时,集成会将超时设置回零。如果您需要更多地控制该过程,您可能不应使用该集成。

我担心情况就是这样。看起来他们在文档中确实提到了,但是说得很隐晦。“当Lambda读取一个批量时,消息会保留在队列中,但会在队列的可见性超时时间内隐藏起来。如果您的函数成功处理了该批量,Lambda将从队列中删除这些消息。如果您的函数被限流、返回错误或者没有响应,那么这条消息会重新变为可见状态。所有在批量处理失败的消息都会返回到队列中,因此您的函数代码必须能够多次处理同一条消息而不产生副作用。” - TimJ
它说了,但没有明确说明它会重置超时时间来执行此操作。 - Jason Wadsworth
有一个注释指出了Lambda在调用之间不会重置可见性超时的至少一个条件。(来源):
为了让函数有足够的时间处理每个记录批次,请将源队列的可见性超时设置为至少是您在函数上配置的超时时间的6倍。额外的时间可以让Lambda在函数处理前一批数据时进行重试,以防止函数执行被限制。
- Daniel

2

更新:事实并非如此。我没有正确等待调用以调整超时时间完成。因此,Lambda在该请求完成之前关闭。我在lambda中设置的消息超时时间得到了尊重。然后我会抛出错误以防止消息被删除。


你应该把这标记为你问题的答案。 - Daniel
我对下面的答案和这个答案有点困惑。在Lambda内部是否可以更改消息的可见性,以便处理时间较长的消息,或者这样做会失败,因为Lambda独立处理轮询? - rvwsd

0
由于SQS触发器对于批量处理消息,标准的forEach中使用await是无效的(如果回调函数是异步的,forEach不会等待它们)。为了避免这个问题,你可以创建自己的异步forEach版本:
var AWS = require('aws-sdk');

exports.handler = async function(event, context) {
    var sqs = new AWS.SQS({apiVersion: '2012-11-05'});
    
    await asyncForEach(event.Records, async record => {
        const { receiptHandle } = record;
            const sqsParams = {
              QueueUrl: '<YOUR_QUEUE_URL>', /* required */
              ReceiptHandle: receiptHandle, /* required */
              VisibilityTimeout: '<in seconds>' /* required */
            };
            try {
                let res = await sqs.changeMessageVisibility(sqsParams).promise();
                console.log(res);
            } catch (err) {
                console.log(err, err.stack);
                throw new Error('Fail.');
            }
        }
    });
    return {};
};

async function asyncForEach(array, callback) {
  for (let index = 0; index < array.length; index++) {
    await callback(array[index], index, array);
  }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接