DynamoDB 转换为 Kinesis 数据流

3

我有一个应用程序会写入DynamoDB数据库表中,现在我想使用Kinesis对我的数据进行聚合,然后将聚合后的数据写入到另一个DynamoDB数据库表中。

我的DynamoDB表启用了Streams,同时我也在该Streams上添加了一个Lambda触发器,代码如下:

'use strict';

var AWS = require('aws-sdk');
var kinesis = new AWS.Kinesis();

exports.handler = (event, context, callback) => {
    event.Records.forEach((record) => {

        var myValue = record.dynamodb.NewImage.myValue.N;
        var partitionKey = record.key.S;
        var data = '{"VALUE":"' + myValue + '"}';

        var recordParams = {
            Data: data,
            PartitionKey: partitionKey,
            StreamName: 'MyStreamName'
        };

        console.log('Try Put to Kinesis Stream');

        kinesis.putRecord(recordParams, function(err, data) {
            if (err) {
                console.log('Failed Put');
            } else {
                console.log('Successful Put');
            }
        });
    });
};

当我在Lambda测试事件中有三个或四个元素时,这会成功地写入我的Kinesis流。
当我启用触发器时,它根本不会写入我的Kinesis流。似乎每次会有大约100个元素进来。在Cloudwatch中,我看到了“尝试将数据放入Kinesis流”的消息,但我甚至没有看到成功/失败的消息。
我是完全错了还是有更好的方法解决这个问题?
如果DynamoDB的流能够直接提供给Kinesis Analytics,那将是我的第一选择 :)
2个回答

4
你的错误在于lambda函数没有等待所有kinesis.putRecord调用完成。
在Node.js中,你有一个回调函数的编程模型。你发起一个异步请求,当请求完成时,回调函数被调用。因此,在函数返回时,请求并未完成。只有在回调函数被调用时,请求才算完成。
问题的两个解决方案:
自己跟踪已调用的回调函数
'use strict';
var AWS = require('aws-sdk');
var kinesis = new AWS.Kinesis();
exports.handler = (event, context, callback) => {
    event.Records.forEach((record) => {
        var myValue = record.dynamodb.NewImage.myValue.N;
        var partitionKey = record.key.S;
        var data = '{"VALUE":"' + myValue + '"}';
        var recordParams = {
            Data: data,
            PartitionKey: partitionKey,
            StreamName: 'MyStreamName'
        };
        console.log('Try Put to Kinesis Stream');
        var i = 0;
        kinesis.putRecord(recordParams, function(err, data) {
            if (err) {
                console.log('Failed Put');
                i = event.Records.length;
            } else {
                console.log('Successful Put');
                i += 1;
            }
            if (i === event.Records.length) {
                console.log('All done');
                callback(err);
            }
        });
    });
};

或者使用像async这样的库:https://www.npmjs.com/package/async


这对于小数据对象'{"VALUE":"12345"}'有效,但是一旦我添加了更多属性(我的对象有>6),则其中一些记录甚至没有记录被写入流中。 我在周末禁用了Kinesis应用程序,现在再次尝试似乎可以工作。 我现在唯一的担忧是它会再次发生,并且不是所有记录都被传递到流中。 - intanethi
它运行了4个小时,然后Kinesis流中根本没有写入任何内容。我增加了函数的超时时间,这似乎有所帮助,但看起来在第一个超时请求之后,每个后续请求也都超时了。不确定为什么,因为大多数批处理都非常小。 - intanethi
你应该使用async库。在一个数组上执行forEach循环时进行异步调用是不安全的。你需要使用async库来协调整个过程,可以这样做:async.mapLimit(event.Records, 5, putIntoKinesis, callback)或类似的方式。另一个选择是批量将项目放入kinesis,你不必进行大量单独的写操作。 - justin.m.chase

0

在我看来,除了需要根据 hellomichibye 的建议调用 callback 之外,您整体问题的一部分以及您在评论中描述的行为可能源于您构建 Data 值的方式。尝试使用 JSON.stringify 而不是手动创建 Data 的 JSON 字符串,这样您就知道输入始终会被正确格式化。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接