我能自动将换行符附加到AWS Firehose记录吗?

21

我正在尝试使用以下设置配置Kinesis Analytics应用程序:

  • 输入流是一个Kinesis Firehose,它正在接收字符串化的JSON值
  • SQL是一个简单的pass-through(它需要更复杂的操作,但为了测试,它只是传递数据)
  • 输出流是第二个Kinesis Firehose,将记录传递到S3存储桶

稍后,我将使用Hive + JSONSERDE导入S3存储桶的内容,它希望每个JSON记录都在自己的行上。Firehose输出只附加所有JSON记录,这会破坏JSONSERDE。

可以将AWS Lambda数据格式化程序附加到输出流中,但那似乎很昂贵。我想要的只是使用换行符分隔每个记录。

如果我没有Analytics应用程序,我会将换行符附加到每个Firehose记录。似乎奇怪的是,在应用程序的SQL中没有办法做到这一点:

CREATE OR REPLACE STREAM "STREAM_OUT" (
  a VARCHAR(4),
  b VARCHAR(4),
  c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  INSERT INTO "STREAM_OUT"
    SELECT STREAM
      "a",
      "b",
      "c"
    FROM "SOURCE_SQL_STREAM_001";

最好的解决方案是添加Lambda数据格式化器吗?我真的很想避免这种情况。

4个回答

4

我有一个类似的需求,需要向生成的firehose文件添加新行。在我们的应用程序中,通过API Gateway调用firehose。

这在集成请求部分的Body Mapping Templates中指定。

以下命令在API Gateway中生成换行符以将数据发送到kinesis firehose记录中。

方法1:

    #set($payload="$input.path('$.Record.Data')
")
        {
            "DeliveryStreamName": "$input.path('$.DeliveryStreamName')",
            "Record": {
            "Data": "$util.base64Encode($payload)"
        }
        }

如果您通过API Gateway调用firehose,则这将完美地运作。
谢谢和问候, Srivignesh KN

这适用于Firehose,但不适用于Analytics App。Analytics App会剥离其输出中的换行符。 - MrHen
你可以尝试在Firehose内进行数据转换,并使用Lambda函数添加新行,然后让Kinesis将其传递到S3。 - Srivignesh KN
3
我在问题中提到了这一点。我不想仅仅为了这个目的添加一个Lambda转换器。 - MrHen
1
你有没有找到其他添加新行分隔符的方法?上周五,AWS向一些地区发布了新的更新,这破坏了你的解决方案。现在添加Cg==也没有帮助。如果你添加Cg==,大多数情况下现在会抛出SerializationException。例如,这段代码现在不再起作用:{ "DeliveryStreamName": "fus-bear-csv-dev", "Records": [ { "Data": "$util.base64Encode('a')Cg==" } ] }转换后的方法响应体:{"__type":"SerializationException"} - Andrey Cheptsov
谢谢您让我知道,您能告诉我您遇到问题的地区吗?我会尝试从那个地区,并发布一个替代方案。在美国东部和美国西部地区,这对我来说完全正常。 - Srivignesh KN
显示剩余3条评论

4

我发布了这个回答,只是为了让问题更新最近AWS的公告。AWS最近宣布Kinesis Firehose Delivery流现在支持动态分区。它支持每个记录添加新行字符。更多信息请参见这里这里


2
以下是我们实现的一个基本示例。我们使用JavaScript将记录放入Kinesis流中,并使用Firehose将其重定向到具有gzip压缩的s3位置。稍后,Athena将从s3位置查询以获取来自s3的记录。
以下是在使用JavaScript代码发送到Kinesis流之前添加新行的代码。
var payload = JSON.parse(payload);  
finalData = JSON.stringify(payload)+"\n";

var kinesisPayload = {};    
kinesisPayload.Data = finalData;    
kinesisPayload.StreamName = "kinesisStreamName");    
kinesisPayload.PartitionKey = "124";

1
我猜这在Analytics应用中不起作用。在原始记录中添加换行符并不难,但是Analytics应用在将它们发送到Firehose之前会将其剥离。 - MrHen

2

使用Python或Node.js的解决方案

我正在使用DynamoDB Streams,并且需要将这些记录保存到S3中。我实现了一个Kinesis Firehose流以及一个Lambda函数。这可以将我的记录作为JSON字符串保存到S3中,但是,保存到S3文件中的每个记录都是内联的,也就是说,在一行中连续存在,因此我需要在添加每个记录时在其末尾添加一个新行,以便每个记录都在自己的一行上。对于我的解决方案,我最终必须进行一些base64解码/编码。

以下是我执行此操作的步骤:

  1. 创建Kinesis Firehose流时,请启用“使用AWS Lambda转换源记录”(选择“已启用”)。如果您已经创建了流,则仍然可以通过编辑现有流来启用此功能。
  2. 此时,您需要选择另一个执行此转换的Lambda函数。在我的情况下,我需要在每个记录的末尾添加一个新行,以便在文本编辑器中打开文件并查看时,每个条目都在单独的一行上。

以下是我用于第二个Lambda的Python和Node.js的测试解决方案代码:

添加新行的Python解决方案:

import json
import boto3
import base64

output = []

def lambda_handler(event, context):
    
    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        print('payload:', payload)
        
        row_w_newline = payload + "\n"
        print('row_w_newline type:', type(row_w_newline))
        row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
        
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': row_w_newline
        }
        output.append(output_record)

    print('Processed {} records.'.format(len(event['records'])))
    
    return {'records': output}

添加换行的Node.js解决方案:

'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => {

   
    /* Process the list of records and transform them */
    const output = event.records.map((record) => {
        
        let entry = (new Buffer(record.data, 'base64')).toString('utf8');
        let result = entry + "\n"
        const payload = (new Buffer(result, 'utf8')).toString('base64');
            
            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
            
    });
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });
};

以下是帮助我整理Python版本的一些好参考资料:

在上面的问题中,MrHen想要在不使用第二个Lambda的情况下完成此操作。我能够在第一个Lambda中使其工作,而不是使用Kinesis Firehose转换源记录功能。我按照以下顺序对来自DynamoDB的newImage进行了编码、解码、添加新行("\n")、编码和解码处理。可能有更简洁的方法。我选择使用第二个Lambda函数作为转换源记录功能,因为目前它对我来说似乎更加清晰。

在我的情况下,单个Lambda解决方案如下:

 # Not pretty, but it works! Successfully adds new line to record.
 # newImage comes from the DynamoDB Stream as a Python dictionary object,
 # I convert it to a string before running the code below.

    newImage = base64.b64encode(newImage.encode('utf-8'))
    newImage = base64.b64decode(newImage).decode('utf-8')
    newImage = newImage + "\n"
    newImage = base64.b64encode(newImage.encode('utf-8'))
    newImage = base64.b64decode(newImage).decode('utf-8')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接