如何从AWS Kinesis数据流事件中获取数据?

6

我正在开发一个Python Lambda函数,用于消费AWS Kinesis数据流。但我很难理解Kinesis记录事件的格式。例如:

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}

来源: 使用 AWS Lambda 与 Amazon Kinesis

我最初将数据放在 kinesis 流上的位置在这个对象中表示为什么?如何访问这些数据?

1个回答

17

您在数据流上放置的数据在每个记录的kinesis.data键上表示为Base64编码的字符串。例如(截断):

{
    "Records": [
        {
            "kinesis": {
                ...
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                ...
            },
            ...
        },
        {
            "kinesis": {
                ...
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                ...
            },
            ...
        }
    ]
}

要访问数据,循环遍历每个Records对象并对kinesis.data值进行Base64解码。

import base64


for record in event["Records"]:
    decoded_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")

    print(decoded_data)
    # Record 1: Hello, this is a test.
    # Record 2: This is only a test.

注意: 本示例假设发送到 Kinesis 流的数据在 Kinesis b64 编码之前已经原始使用 utf-8 进行编码。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接