我有一个应用程序会写入DynamoDB数据库表中,现在我想使用Kinesis对我的数据进行聚合,然后将聚合后的数据写入到另一个DynamoDB数据库表中。
我的DynamoDB表启用了Streams,同时我也在该Streams上添加了一个Lambda触发器,代码如下:
'use strict';
var AWS = require('aws-sdk');
var kinesis = new AWS.Kinesis();
exports.handler = (event, context, callback) => {
event.Records.forEach((record) => {
var myValue = record.dynamodb.NewImage.myValue.N;
var partitionKey = record.key.S;
var data = '{"VALUE":"' + myValue + '"}';
var recordParams = {
Data: data,
PartitionKey: partitionKey,
StreamName: 'MyStreamName'
};
console.log('Try Put to Kinesis Stream');
kinesis.putRecord(recordParams, function(err, data) {
if (err) {
console.log('Failed Put');
} else {
console.log('Successful Put');
}
});
});
};
当我在Lambda测试事件中有三个或四个元素时,这会成功地写入我的Kinesis流。
当我启用触发器时,它根本不会写入我的Kinesis流。似乎每次会有大约100个元素进来。在Cloudwatch中,我看到了“尝试将数据放入Kinesis流”的消息,但我甚至没有看到成功/失败的消息。
我是完全错了还是有更好的方法解决这个问题?
如果DynamoDB的流能够直接提供给Kinesis Analytics,那将是我的第一选择 :)
async.mapLimit(event.Records, 5, putIntoKinesis, callback)
或类似的方式。另一个选择是批量将项目放入kinesis,你不必进行大量单独的写操作。 - justin.m.chase