我有一个问题,你在流中打开了多少个分片?每个分片同时只能执行1个lambda实例。因此,如果你只有1个分片,那么你每次只会有1个lambda访问你的RDS实例。
你有数据表明这是一个问题吗?
接下来是一个可能不可靠的hack,在生产环境中应该避免使用。
对于最小批处理大小,你可以从你的node.js lambda函数返回一个error
,如果批处理大小小于你期望的记录数。
例如:
handler(event, context, callback) {
const records = event.Records;
if (records.length() < minBatchSize) {
callback('insufficient batch size');
} else {
processRecords(records, callback);
}
}
但是有两个问题需要考虑:
1)如果在你的流上配置了最大事件时间限制,那么你不能无限期地执行此操作,否则可能会丢失数据。此后,记录将从流中消失。请注意,您需要额外付费才能使用此功能(请参阅extended data retention)。
您可以从 Lambda 或 Kinesis Shard 迭代器年龄指标推断出批次年龄,请参见http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html。
我不确定这对于拥有多个 Shard 的情况是否可靠,但是例如……
handler(event, context, callback) {
const records = event.Records;
if (records.length() < minBatchSize) {
if (calculateLambdaAge() > tooLongDelayThreshold) {
processRecords(records, callback);
} else {
callback(new Error('insufficient batch size'));
}
} else {
processRecords(records, callback);
}
}
calculateLambdaAge() {
}
如果CloudWatch无法提供信息,您可能需要在至少与您的RDS(Redis / Dynamo)一样可扩展的地方跟踪它。
2)与其将精力投入到使#1可靠上,不如将额外的努力放在扩大您的RDS实例上,使您当前的使用更加高效。
在编写代码示例时,我参考了this和this。