亚马逊Kinesis和AWS Lambda重试

34

我对Amazon Kinesis非常陌生,因此可能是我的理解有误,但在AWS Lambda FAQ中提到:

发送到 AWS Lambda 函数的 Amazon Kinesis 和 DynamoDB Streams 记录是严格序列化的,每个分片都是如此。这意味着,如果将两个记录放入同一分片中,Lambda 保证您的 Lambda 函数将在第一个记录成功调用之前被成功调用第二个记录。如果其中一个记录的调用超时、被限流或遇到任何其他错误,Lambda 将重试,直到成功(或记录达到其 24 小时过期时间)再继续处理下一个记录。不保证跨不同分片的记录顺序,并且每个分片的处理是并行的。

我的问题是,如果由于生产者导致某些格式不正确的数据被放置到分片上,并且当 Lambda 函数拾取它时发生错误,然后一直重试会发生什么?这将导致该特定分片的处理因出错而被阻塞 24 小时。

最佳实践是否是通过在自定义错误中包装问题并将此错误与所有成功处理的记录一起向下游发送并让使用者处理来处理应用程序错误?当然,在像空指针这样导致程序崩溃的不可恢复性错误的情况下,这仍然无法解决问题:我们将再次回到阻塞重试循环,直到接下来的 24 小时。

3个回答

40

不要想太多,Kinesis只是一个队列。你必须成功消费一条记录(即从队列中弹出)才能继续进行下一条记录的处理。就像一个FIFO栈。

适当的方法应该是:

  • 从流中获取一条记录。
  • 在try-catch-finally块中处理它。
  • 如果记录被成功处理,没有问题。 <- TRY
  • 但是如果处理失败,将其记录到另一个地方以便调查失败的原因。 <- CATCH
  • 在逻辑块的结尾处始终将位置持久化到DynamoDB中。 <- FINALLY
  • 如果系统内部发生错误(内存错误、硬件错误等),那就是另外一回事;因为它可能会影响处理所有记录,而不仅仅是一条记录。

顺便说一句,如果处理一条记录需要超过1分钟,那么显然你做错了什么。因为Kinesis设计用于每秒处理数千条记录,你不应该有这样长的处理时间。

你正在问的问题是队列系统的一般问题,有时被称为“毒性消息”。你必须在业务逻辑中处理它们以确保安全。

http://www.cogin.com/articles/SurvivingPoisonMessages.php#PoisonMessages


1
听起来很合理,但是关于 DynamoDb 部分有一个快速的问题,为什么我需要持久化位置(我想你指的是序列号)? - Stefano
1
因为当您停止“Kinesis Consumer Application”节点并稍后重新启动时,您应该能够从上次停止的位置继续。 - az3
啊,是的,那很有道理。 - Stefano
两个答案都很好,表达的意思也相似,但我会选择@az3的答案,因为他回答得更快。 - Stefano
在worker.java中,它调用runProcessLoop,在其中它调用shardConsumer.consumeShard(),然后调用checkAndSubmitNextTask(),在其中检查准备好进行下一个任务否。如果没有准备好,它就不会消耗新记录。那么如何使worker检索新记录而无需处理先前的recordprocessor呢? - user1846749

24

这是有关在Kinesis中处理事件的常见问题,我将尝试为您提供一些要点,以构建Lambda函数来处理带有“损坏”数据的问题。由于最佳实践应将系统的不同部分分别写入Kinesis流和从Kinesis流读取的其他部分,因此您通常会遇到此类问题。

首先,“为什么会有这样问题的事件”?

使用Kinesis处理事件是将同时/代码后端处理(分析事件)和前端处理(为最终用户提供服务)的复杂系统分解为系统的两个独立部分的好方法。前端人员可以专注于他们的业务,而后端人员不需要向前端推送代码更改,如果他们想添加功能以满足其分析用例。 Kinesis是一种事件缓冲区,可以打破同步需求并简化业务逻辑代码。

因此,我们希望写入流的事件在其“schema”方面具有灵活性,如果前端团队希望更改事件格式、添加字段、删除字段、更改协议或加密密钥,则应根据需要随时进行更改。

现在需要从流中读取事件的团队能够以高效的方式处理这些灵活的事件,而不是每次发生更改时都会中断其处理。因此,您的Lambda函数将看到它无法处理的事件,而“毒药丸”并不是您所期望的罕见事件。

其次,“如何处理这些问题事件?”

您的Lambda函数将获取要处理的一事件。请注意,您不应逐个获取事件,而是应以大型事件批次获取事件。如果您的批次太小,则流上将很快出现较大的延迟。

对于每个批次,您将遍历事件、处理它们,然后在DynamoDB中进行检查点处理该批次的最后序列标识。 Lambda正在使用(在此处查看更多信息:http://docs.aws.amazon.com/lambda/latest/dg/walkthrough-kinesis-events-adminuser-create-test-function.html)自动完成大部分步骤。

console.log('Loading function');

exports.handler = function(event, context) {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(function(record) {
        // Kinesis data is base64 encoded so decode here
        payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded payload:', payload);
    });
    context.succeed();
};

在“快乐路径”中,如果所有事件都能够没有任何问题地被处理,那么就会发生这种情况。但是如果您在批处理过程中遇到任何问题并且未使用成功通知“提交(commit)”事件,则该批处理将失败,并且您将再次获得该批处理中的所有事件。

现在需要确定处理失败的原因。

  • 临时问题(限流,网络问题等)- 可以等待一秒钟,尝试几次。在很多情况下,问题会自行解决。

  • 偶然问题(内存不足等)- 最好增加Lambda函数的内存分配或减少批量大小。在许多情况下,这种修改会解决问题。

  • 持续故障- 这意味着您要么忽略有问题的事件(将其放入死信队列-DLQ),要么修改您的代码来处理它。

问题在于识别代码中的故障类型并进行不同处理。您需要编写Lambda代码以识别(例如异常类型)并作出不同反应。

您可以使用与CloudWatch的集成,将此类故障写入控制台并创建相关的警报。您还可以使用CloudWatch日志作为记录“死信队列”的方法,查看问题的来源。


2
如果批处理中的某些事件成功,而其他事件失败,您该如何处理?考虑一个使用SES发送每个事件电子邮件的lambda。我可能会收到100个事件的批次,并正确发送前20封电子邮件,但随后SES在其余时间内发生故障。我想报告前20个事件的成功(以便我不会向人们发送垃圾邮件),但我想重试后面的80个事件。这是可能的吗? - Cam Jackson
您可以使用带有查找功能的列表来管理,以避免重复。您可以使用DynamoDB表,其中键为电子邮件,值为最后发送的电子邮件。另一个常见的解决方案是在ElastiCache中使用Redis,并将电子邮件键的TTL设置为一定时间。在发送电子邮件之前,您可以检查上次向其发送电子邮件的时间,并在每次成功发送时更新记录。 - Guy
我和 @CamJackson 面临相同的情况。DynamoDB现在支持TTL,这可能对此有用。 - Ezequiel Moreno
如果消息的顺序不重要,那么将80个失败事件重新插入到同一流中(很快再次重试)或者一个新的“5分钟后重试”的流中是否可行? - aalimovs

0

在您的 Lambda 函数中,您可以选择抛出错误并因此返回整个批处理,或者您可以不抛出错误,而是将其推送到 SQS 队列以便以不同方式处理这些消息。SQS 的保留期为 14 天。您还可以使用每个记录的检查点来知道记录是否在上一次运行中被处理。

如果您有大量的传入数据,并且不想引入任何延迟,您可以忽略错误并继续前进,同时将这些事件添加到 SQQ 队列中。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接