这是有关在Kinesis中处理事件的常见问题,我将尝试为您提供一些要点,以构建Lambda函数来处理带有“损坏”数据的问题。由于最佳实践应将系统的不同部分分别写入Kinesis流和从Kinesis流读取的其他部分,因此您通常会遇到此类问题。
首先,“为什么会有这样问题的事件”?
使用Kinesis处理事件是将同时/代码后端处理(分析事件)和前端处理(为最终用户提供服务)的复杂系统分解为系统的两个独立部分的好方法。前端人员可以专注于他们的业务,而后端人员不需要向前端推送代码更改,如果他们想添加功能以满足其分析用例。 Kinesis是一种事件缓冲区,可以打破同步需求并简化业务逻辑代码。
因此,我们希望写入流的事件在其“schema”方面具有灵活性,如果前端团队希望更改事件格式、添加字段、删除字段、更改协议或加密密钥,则应根据需要随时进行更改。
现在需要从流中读取事件的团队能够以高效的方式处理这些灵活的事件,而不是每次发生更改时都会中断其处理。因此,您的Lambda函数将看到它无法处理的事件,而“毒药丸”并不是您所期望的罕见事件。
其次,“如何处理这些问题事件?”
您的Lambda函数将获取要处理的一批事件。请注意,您不应逐个获取事件,而是应以大型事件批次获取事件。如果您的批次太小,则流上将很快出现较大的延迟。
对于每个批次,您将遍历事件、处理它们,然后在DynamoDB中进行检查点处理该批次的最后序列标识。 Lambda正在使用(在此处查看更多信息:http://docs.aws.amazon.com/lambda/latest/dg/walkthrough-kinesis-events-adminuser-create-test-function.html)自动完成大部分步骤。
console.log('Loading function');
exports.handler = function(event, context) {
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(function(record) {
payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
});
context.succeed();
};
在“快乐路径”中,如果所有事件都能够没有任何问题地被处理,那么就会发生这种情况。但是如果您在批处理过程中遇到任何问题并且未使用成功通知“提交(commit)”事件,则该批处理将失败,并且您将再次获得该批处理中的所有事件。
现在需要确定处理失败的原因。
临时问题(限流,网络问题等)- 可以等待一秒钟,尝试几次。在很多情况下,问题会自行解决。
偶然问题(内存不足等)- 最好增加Lambda函数的内存分配或减少批量大小。在许多情况下,这种修改会解决问题。
持续故障- 这意味着您要么忽略有问题的事件(将其放入死信队列-DLQ),要么修改您的代码来处理它。
问题在于识别代码中的故障类型并进行不同处理。您需要编写Lambda代码以识别(例如异常类型)并作出不同反应。
您可以使用与CloudWatch的集成,将此类故障写入控制台并创建相关的警报。您还可以使用CloudWatch日志作为记录“死信队列”的方法,查看问题的来源。