Kinesis Lambda Consumer 最小批处理大小

7
我正在使用AWS Lambda(node.js)作为AWS Kinesis消费者。我可以看到可以设置最大批处理大小,但我想知道是否可以设置最小批处理大小,以便确保每个Lambda将处理至少50条(或任何数量的)记录。
我希望有一个最小的批次大小,因为Lambda消费者将与RDS MySQL实例建立连接,而我正在尝试保持并发连接数低。
如果没有设置最小值的配置功能,则欢迎提供任何解决方法的想法。
谢谢。
2个回答

2
一种方法是使用Kinesis Firehose,它基于传递流的缓冲配置将多个传入记录连接在一起。
以下是需要翻译的内容:
  1. 将数据发送到Firehose - 直接使用API将记录放入Firehose流中,或将Firehose附加到现有的Kinesis流。
  2. 将S3设置为Firehose的目的地 - 这将汇总您的单个记录,并将它们作为单个对象放入S3。您可以指定分隔符,甚至可以对单个记录进行转换Lambda函数。
  3. 监听S3:PutObject - 将Lambda附加到监听从Firehose流接收这些聚合记录的S3存储桶。

这种方式在高吞吐量系统上的性能非常差,因为它会生成大量的小文件和许多难以监控且服务水平协议(SLA)不确定的S3触发器。 - Remi D

0

我有一个问题,你在流中打开了多少个分片?每个分片同时只能执行1个lambda实例。因此,如果你只有1个分片,那么你每次只会有1个lambda访问你的RDS实例。

你有数据表明这是一个问题吗?

接下来是一个可能不可靠的hack,在生产环境中应该避免使用。

对于最小批处理大小,你可以从你的node.js lambda函数返回一个error,如果批处理大小小于你期望的记录数。

例如:

handler(event, context, callback) {
  const records = event.Records;
  if (records.length() < minBatchSize) {
    callback('insufficient batch size');
  } else {
    processRecords(records, callback);
  }
}

但是有两个问题需要考虑:

1)如果在你的流上配置了最大事件时间限制,那么你不能无限期地执行此操作,否则可能会丢失数据。此后,记录将从流中消失。请注意,您需要额外付费才能使用此功能(请参阅extended data retention)。

您可以从 Lambda 或 Kinesis Shard 迭代器年龄指标推断出批次年龄,请参见http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html

我不确定这对于拥有多个 Shard 的情况是否可靠,但是例如……

handler(event, context, callback) {
  const records = event.Records;
  if (records.length() < minBatchSize) {
    if (calculateLambdaAge() > tooLongDelayThreshold) {
      processRecords(records, callback);
    } else {
      callback(new Error('insufficient batch size'));
    }
  } else {
    processRecords(records, callback);
  }
}

calculateLambdaAge() {
  // interrogate cloudwatch
}

如果CloudWatch无法提供信息,您可能需要在至少与您的RDS(Redis / Dynamo)一样可扩展的地方跟踪它。
2)与其将精力投入到使#1可靠上,不如将额外的努力放在扩大您的RDS实例上,使您当前的使用更加高效。

在编写代码示例时,我参考了thisthis


1
根据文档所述,返回错误会使Lambda重新处理相同的批次... - Remi D

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接