将AWS Lambda数据推送到Kinesis流

19

有没有一种方法可以从Lambda函数向Kinesis流推送数据? 我已经在互联网上搜索过,但没有找到任何相关的示例。

谢谢。

3个回答

17

是的,您可以从Lambda发送信息到Kinesis Stream,并且非常简单。确保您正在使用正确的权限运行Lambda。

  1. 创建一个名为kinesis.js的文件,该文件将提供一个“保存”函数,该函数接收有效负载并将其发送到Kinesis Stream。我们希望能够在任何想要发送数据到流的地方包括此“保存”函数。 代码:

const AWS = require('aws-sdk');
const kinesisConstant = require('./kinesisConstants'); //Keep it consistent
const kinesis = new AWS.Kinesis({
  apiVersion: kinesisConstant.API_VERSION, //optional
  //accessKeyId: '<you-can-use-this-to-run-it-locally>', //optional
  //secretAccessKey: '<you-can-use-this-to-run-it-locally>', //optional
  region: kinesisConstant.REGION
});

const savePayload = (payload) => {
//We can only save strings into the streams
  if( typeof payload !== kinesisConstant.PAYLOAD_TYPE) {
    try {
      payload = JSON.stringify(payload);
    } catch (e) {
      console.log(e);
    }
  }

  let params = {
    Data: payload,
    PartitionKey: kinesisConstant.PARTITION_KEY,
    StreamName: kinesisConstant.STREAM_NAME
  };

  kinesis.putRecord(params, function(err, data) {
    if (err) console.log(err, err.stack);
    else     console.log('Record added:',data);
  });
};

exports.save = (payload) => {
  const params = {
    StreamName: kinesisConstant.STREAM_NAME,
  };

  kinesis.describeStream(params, function(err, data) {
    if (err) console.log(err, err.stack);
    else {
      //Make sure stream is able to take new writes (ACTIVE or UPDATING are good)
      if(data.StreamDescription.StreamStatus === kinesisConstant.STATE.ACTIVE
        || data.StreamDescription.StreamStatus === kinesisConstant.STATE.UPDATING ) {
        savePayload(payload);
      } else {
        console.log(`Kinesis stream ${kinesisConstant.STREAM_NAME} is ${data.StreamDescription.StreamStatus}.`);
        console.log(`Record Lost`, JSON.parse(payload));
      }
    }
  });
};

  1. 创建一个kinesisConstant.js文件来保持一致性 :)

module.exports = {
  STATE: {
    ACTIVE: 'ACTIVE',
    UPDATING: 'UPDATING',
    CREATING: 'CREATING',
    DELETING: 'DELETING'
  },
  STREAM_NAME: '<your-stream-name>',
  PARTITION_KEY: '<string-value-if-one-shard-anything-will-do',
  PAYLOAD_TYPE: 'String',
  REGION: '<the-region-where-you-have-lambda-and-kinesis>',
  API_VERSION: '2013-12-02'
}

  • 您的处理程序文件:我们添加了“done”函数,以向想要将数据发送到流的任何人发送响应,但是“kinesis.save(event)”完成所有工作。
  • const kinesis = require('./kinesis');
    
    exports.handler = (event, context, callback) => {
      console.log('LOADING handler');
      
      const done = (err, res) => callback(null, {
        statusCode: err ? '400' : '200',
        body: err || res,
        headers: {
          'Content-Type': 'application/json',
        },
      });
      
      kinesis.save(event); // here we send it to the stream
      done(null, event);
    }


    1
    很好的例子,对我帮助很大。请纠正一下打字错误(小)- kinesisConstants.js - 您错过了文件名中的最后一个“s”。 - Boris Bolshem
    这是一个很好的例子。但我不禁想知道,使用Kinesis的优势是什么。 如果你正在向Lambda传递数据,为什么不直接将其推送到存储(例如:ElasticSearch)中呢? 在大多数AWS存储工具都有自己的Node lib时,为什么要使用第三方工具?这样做更有效吗? - ChristoKiwi
    使用这段代码,如果发送的事件数量超过了DescribeStream方法的极限(10个请求/秒),则会收到LimitExceededError的错误提示。因此,最好删除DescribeStream调用。 - pHiL

    5

    这个操作和在你的电脑上做一样。

    以下是一个nodejs的示例:

    let aws = require('aws');
    let kinesis = new aws.Kinesis();
    
    // data that you'd like to send
    let data_object = { "some": "properties" };
    let data = JSON.stringify(data_object);
    
    // push data to kinesis
    const params = {
      Data: data,
      PartitionKey: "1",
      StreamName: "stream name"
    }
    
    kinesis.putRecord(params, (err, data) => {
      if (err) console.error(err);
      else console.log("data sent");
    }
    

    请注意,这段代码将不能生效,因为Lambda没有访问您的流的权限。 当通过Lambda访问AWS资源时,最好使用IAM角色;
    1. 在配置新的Lambda时,可以选择现有角色或创建一个新角色。
    2. 转到IAM,然后选择角色(Roles),并选择您为Lambda函数分配的角色名称。
    3. 添加相关权限(putRecordputRecords)。
    然后,测试Lambda

    2

    是的,这是可以做到的。我也曾尝试过相同的事情,并能够使用Node.js 4.3运行时在Lambda中完成,它还适用于版本6.10。

    以下是代码:

    在Lambda函数的顶部声明以下内容:

    var AWS = require("aws-sdk");
    var kinesis = new AWS.Kinesis();
    function writeKinesis(rawdata){
        data = JSON.stringify(rawdata);
        params = {Data: data, PartitionKey: "<PARTITION_KEY>", StreamName: "<STREAM_NAME>"};
        kinesis.putRecord(params, (err, data) => {
        if (err) console.error(err);
        else console.log("data sent");
        });  
    }
    

    现在,在 exports.handler 中调用该函数:
    writeKinesis(<YOUR_DATA>);
    

    需要注意的几点...为了让Kinesis能够摄取数据,数据必须经过编码处理。在下面的例子中,我有一个函数从CloudWatch获取日志,并将它们发送到Kinesis流中。

    请注意,我正在将buffer.toString('utf8')的内容插入到writeKinesis函数中:

    exports.handler = function(input, context) {
        ...
        var zippedInput = new Buffer(input.awslogs.data, 'base64');
        zlib.gunzip(zippedInput, function(error, buffer) {
            ...
            writeKinesis(buffer.toString('utf8'));
            ...
        }
        ...
    }
    

    最后,在IAM中配置适当的权限。您的Lambda函数必须在包含以下权限的IAM角色的上下文中运行。在我的情况下,我只是修改了默认的lambda_elasticsearch_execution角色,添加了一个名为“lambda_kinesis_execution”的策略,代码如下:

    "Effect": "Allow",
    "Action": [
        "kinesis:*"
    ],
    "Resource": [
        "<YOUR_STREAM_ARN>"
    ]
    

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接