如何在Lambda中批量失败特定的SQS消息?

32

我有一个带SQS触发器的Lambda。当它被触发时,来自SQS的一批记录会进入(通常一次约有10个)。如果我从处理程序返回失败状态代码,则所有10条消息都将重试。如果我返回成功代码,则它们将全部从队列中删除。如果这10条消息中有1条失败了,并且我想重试只有那一条,该怎么办?

exports.handler = async (event) => {

    for(const e of event.Records){
        try {
            let body = JSON.parse(e.body);
            // do things
        }
        catch(e){
            // one message failed, i want it to be retried
        }        
    }

    // returning this causes ALL messages in 
    // this batch to be removed from the queue
    return {
        statusCode: 200,
        body: 'Finished.'
    };
};

我需要手动将那个消息重新添加到队列吗?还是可以从我的处理程序返回一个指示一个消息失败应该重试的状态?


2
很遗憾,目前仍然没有简单的方法来处理这种情况。 - ysfaran
6个回答

15
根据AWS的文档,SQS事件源映射现在支持开箱即用地处理部分失败。链接文章的要点如下:
  1. 在您的EventSourceMapping配置中包含ReportBatchItemFailures
  2. 在失败的情况下,响应语法必须进行修改:
{
  "batchItemFailures": [
    { "itemIdentifier": "id2" },
    { "itemIdentifier": "id4" }
  ]
}

id2和id4是批处理中失败的消息ID。

  1. 引用文档如下:

如果您的函数返回以下任何一项,Lambda将视批处理为完全成功:

  • 一个空的batchItemFailure列表
  • 一个空的EventResponse
  • 一个null的batchItemFailure列表
  • 一个null的EventResponse

如果您的函数返回以下任何一项,Lambda将视批处理为完全失败:

  • 无效的JSON响应
  • 一个空字符串的itemIdentifier
  • 一个null的itemIdentifier
  • 一个具有错误键名的itemIdentifier
  • 一个具有不存在的消息ID的itemIdentifier

2
第一步非常重要。确保在云形成模板中的Lambda的EventSourceMapping资源中添加"FunctionResponseTypes": ["ReportBatchItemFailures"] - Dave

8

是的,您必须手动将失败的消息重新添加到队列中。

我建议设置一个故障计数器,因此如果所有消息都失败,则可以针对所有消息返回失败状态,否则,如果故障计数器小于10,则可以逐个将失败的消息发送回队列。


1
谢谢。我找到了这篇文章,我已经根据我的使用情况进行了调整。 - user2719094
2
如果我们换个思路:从队列中删除成功的,失败其余的(这意味着在批处理中但未被从队列中删除的任务将重新运行),那会有什么效果吗? - Victor Ferreira
2
是的,您也可以使用删除消息调用来实现这一点。不过,最好采用另一种方式,因为在大多数情况下,您不会出现任何故障,所以最优的方法是仅手动发送失败的那些消息,否则您总是需要对成功的消息进行deleteMessage I/O调用,这将是昂贵和低效的。 - Deiv
1
有点晚了,但是只有在失败计数不等于消息计数且大于0的情况下才能进行单个删除。这样当一切正常时,您就可以避免额外的I/O操作。 - pkunal7

2

在成功处理每条消息后,您需要以编程方式删除每条消息。

因此,如果任何一条消息失败,您可以设置一个标志为true,并根据它在批处理中处理所有消息后引发错误,因此成功的消息将被删除,其他消息将根据重试策略重新处理。

因此,根据以下逻辑,只有失败和未处理的消息将得到重试。

import boto3

sqs = boto3.client("sqs")

def handler(event, context):
    for message in event['records']:
        queue_url = "form queue url recommended to set it as env variable"
        message_body = message["body"]
        print("do some processing :)")
        message_receipt_handle = message["receiptHandle"]
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=message_receipt_handle
        )

还有另一种方法可以将成功处理的消息ID保存到变量中,并根据消息ID执行批量删除操作。

response = client.delete_message_batch(
    QueueUrl='string',
    Entries=[
        {
            'Id': 'string',
            'ReceiptHandle': 'string'
        },
    ]
)

如果在所有消息处理完成后引发错误,它仍将重试所有消息(无论哪个失败)。在 Lambda 执行期间发生任何错误都会导致批处理中的每条消息重试。 - ysfaran
1
重点是通过编程方式删除已成功处理的消息,因此如果引发异常,则只会重试未处理和失败的消息。 - Prashanna
2
抱歉,我觉得我误解了你的回答,但这似乎是一个相当不错的解决方案。如果真的有效,如果您能提供一个示例,那将非常有帮助。我认为这是最好的“简单”解决方案,因为您不必更改原始消息的消息正文,也不需要在Lambda函数中包含重试逻辑,而只需使用AWS的内置重试机制即可。 - ysfaran

1
根据AWS文档,您现在可以实现部分批次响应。
以下是使用Java的代码示例。请注意,根据文档,您首先需要激活Lambda函数的ReportBatchItemFailures功能。
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
 
import java.util.ArrayList;
import java.util.List;
 
public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> {
    @Override
    public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
 
         List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>();
         String messageId = "";
         for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) {
             try {
                 //process your message
                 messageId = message.getMessageId();
             } catch (Exception e) {
                 //Add failed message identifier to the batchItemFailures list
                 batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId));
             }
         }
         return new SQSBatchResponse(batchItemFailures);
     }
}

0
AWS 支持部分批处理响应。以下是 Typescript 代码示例。
type Result = {
  itemIdentifier: string
  status: 'failed' | 'success'
}

const isFulfilled = <T>(
  result: PromiseFulfilledResult<T> | PromiseRejectedResult
): result is PromiseFulfilledResult<T> => result.status === 'fulfilled'

const isFailed = (
  result: PromiseFulfilledResult<Result>
): result is PromiseFulfilledResult<
  Omit<Result, 'status'> & { status: 'failed' }
> => result.value.status === 'failed'

const results = await Promise.allSettled(
 sqsEvent.Records.map(async (record) => {
   try {
     return { status: 'success', itemIdentifier: record.messageId }
   } catch(e) {
     console.error(e);
     return { status: 'failed', itemIdentifier: record.messageId }
   }
  })
)

return results
    .filter(isFulfilled)
    .filter(isFailed)
    .map((result) => ({
      itemIdentifier: result.value.itemIdentifier,
    }))


0

你需要以不同的方式设计你的应用程序,这里有一些想法,虽然不是最好的,但可以解决你的问题。

解决方案1:

  • 创建 sqs 传递队列 - sq1
  • 根据延迟需求创建延迟队列 sq2
  • 创建死信队列 sdl
  • 现在,在 lambda 函数内,如果消息在 sq1 中失败,则在 sq1 上将其删除并将其放在 sq2 上进行重试。任何异步调用的 Lambda 函数都会在事件被丢弃之前重试两次。如果重试失败,则将其移至死信队列 sdl。

  • 如果再次失败,则将其移到死信队列 sdl 中。

注意:当SQS事件源映射最初被创建并启用,或在一段时间内没有流量后首次出现时,Lambda服务将开始使用五个并行的长轮询连接来轮询SQS队列,根据AWS文档,AWS Lambda到SQS的长轮询默认持续时间为20秒。

解决方案二:

使用 AWS StepFunction

StepFunction 会调用 Lambda 并处理失败的重试逻辑,如果需要,可以配置指数级退避。

**解决方案3:**

使用CloudWatch计划事件来触发一个Lambda函数,该函数轮询是否失败。

给定事件源的错误处理取决于Lambda如何被调用。Amazon CloudWatch事件以异步方式调用您的Lambda函数。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接