将AWS Kinesis Firehose中的Parquet格式数据写入AWS S3

32

我想将以parquet格式的数据从Kinesis Firehose导入S3。到目前为止,我只找到了一个需要创建EMR的解决方案,但我正在寻找更便宜和更快的方式,比如直接从Firehose将接收到的JSON存储为parquet,或者使用Lambda函数。

非常感谢, Javi。

3个回答

30

好消息,此功能今天发布了!

Amazon Kinesis Data Firehose可以在将数据存储到Amazon S3之前,将输入数据的格式从JSON转换为Apache Parquet或Apache ORC。Parquet和ORC是列式数据格式,可节省空间并加快查询速度。

要启用,请转到您的Firehose流并单击编辑。您应该会看到如下屏幕截图所示的记录格式转换部分:

输入图像描述

有关详细信息,请参阅文档:https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html


24

在使用AWS支持服务和数百个不同的实现之后,我想解释一下我取得了什么成果。

最终,我创建了一个Lambda函数来处理由Kinesis Firehose生成的每个文件,根据负载对我的事件进行分类,并将结果存储在S3中的Parquet文件中。

这并不容易:

  1. 首先,您应该创建一个Python虚拟环境,其中包括所有所需的库(在我的情况下为Pandas、NumPy、Fastparquet等)。由于生成的文件(其中包括所有库和我的Lambda函数)很大,因此需要启动EC2实例,我已经使用了免费层中提供的实例。按照以下步骤创建虚拟环境:

    • 登录EC2
    • 创建一个名为lambda(或任何其他名称)的文件夹
    • Sudo yum -y update
    • Sudo yum -y upgrade
    • sudo yum -y groupinstall“Development Tools”
    • sudo yum -y install blas
    • sudo yum -y install lapack
    • sudo yum -y install atlas-sse3-devel
    • sudo yum install python27-devel python27-pip gcc
    • Virtualenv env
    • source env/bin/activate
    • pip install boto3
    • pip install fastparquet
    • pip install pandas
    • pip install thriftpy
    • pip install s3fs
    • pip install(其他所需的库)
    • find ~/lambda/env/lib*/python2.7/site-packages/ -name“*.so”| xargs strip
    • pushd env/lib/python2.7/site-packages/
    • zip -r -9 -q ~/lambda.zip *
    • Popd
    • pushd env/lib64/python2.7/site-packages/
    • zip -r -9 -q ~/lambda.zip *
    • Popd
  2. 正确创建lambda_function:

  3. import json
    import boto3
    import datetime as dt
    import urllib
    import zlib
    import s3fs
    from fastparquet import write
    import pandas as pd
    import numpy as np
    import time
    
    def _send_to_s3_parquet(df):
        s3_fs = s3fs.S3FileSystem()
        s3_fs_open = s3_fs.open
        # FIXME add something else to the key or it will overwrite the file 
        key = 'mybeautifullfile.parquet.gzip'
        # Include partitions! key1 and key2
        write( 'ExampleS3Bucket'+ '/key1=value/key2=othervalue/' + key, df,
                compression='GZIP',open_with=s3_fs_open)            
    
    def lambda_handler(event, context):
        # Get the object from the event and show its content type
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'])
        try:
            s3 = boto3.client('s3')
            response = s3.get_object(Bucket=bucket, Key=key)
            data = response['Body'].read()
            decoded = data.decode('utf-8')
            lines = decoded.split('\n')
            # Do anything you like with the dataframe (Here what I do is to classify them 
            # and write to different folders in S3 according to the values of
            # the columns that I want
            df = pd.DataFrame(lines)
            _send_to_s3_parquet(df)
        except Exception as e:
            print('Error getting object {} from bucket {}.'.format(key, bucket))
            raise e
    
  4. 将lambda函数复制到lambda.zip,并部署lambda_function:

    • 返回您的EC2实例并将所需的lambda函数添加到zip文件中:zip -9 lambda.zip lambda_function.py(lambda_function.py是步骤2生成的文件)
    • 将生成的zip文件复制到S3,因为直接部署非常重且容易失败。 aws s3 cp lambda.zip s3://support-bucket/lambda_packages/
    • 部署lambda函数:aws lambda update-function-code --function-name --s3-bucket support-bucket --s3-key lambda_packages/lambda.zip
  5. 在需要时触发执行,例如每次在S3中创建新文件,甚至可以将lambda函数关联到Firehose。(我没有选择此选项,因为'lambda'限制比Firehose限制低,您可以将Firehose配置为每128Mb或15分钟写入一个文件,但如果将此lambda函数关联到Firehose,则每3分钟或5MB执行一次该lambda函数,在我的情况下,我遇到了生成大量小parquet文件的问题,因为每次启动lambda函数时,我至少会生成10个文件)。


我理解的是否正确,这个管道会为每条记录创建一个Parquet文件?由于Parquet是一种列式存储,那么需要某种单独的压缩作业来将这些小的Parquet文件合并成一个较大的文件吗? - Tagar
1
嗨@Tagar,每次调用lambda_handler时,它都会写入一个parquet文件,并且可以进行配置,例如您可以将其配置为每15分钟启动一次,这将在每15分钟内创建一个包含此时间段内接收到的所有事件的文件。 - bracana

11

Amazon Kinesis Firehose接收流记录并可以将它们存储在Amazon S3(或Amazon Redshift或Amazon Elasticsearch Service)中。

每个记录可以最多达到1000KB。

Kinesis流程图

但是,记录会被附加在一起形成文本文件,并根据时间或大小进行分批处理。传统上,记录采用JSON格式。

您将无法发送parquet文件,因为它不符合此文件格式。

触发Lambda数据转换功能是可能的,但它也不能输出parquet文件。

事实上,考虑到parquet文件的性质,很难逐条构建它们。作为列存储格式,它们似乎确实需要批量创建,而不是每条记录追加数据。

底线:不行。


1
嗨@Javi,如果这个或任何答案解决了你的问题,请考虑通过点击复选框接受它。这向更广泛的社区表明你已经找到了解决方案,并为回答者和你自己赢得了一些声誉。没有义务这样做。 - John Rotenstein
@JohnRotenstein,您能否使用Lambda对Firehose中每个缓冲时间/大小批次进行转换,并稍后将Parquet文件连接在一起,每隔几个小时或更长时间的时间内形成更大的文件?这样可以通过Firehose将JSON流式传输到Parquet,以便在Athena中获得接近实时的数据,并仍然获得Parquet的性能优势。 - chris.mclennon
@cmclen,Parquet是一种列式文件格式。我认为你不能只逐行追加数据,这将违背使用Parquet的初衷。 - John Rotenstein
@JohnRotenstein 在12天前之前,您不能依赖Firehose将转换后的数据倾倒到S3中,但是您可以像bracana指出的那样使用S3FS或类似工具自己编写文件。如果您希望它们显示为已成功,则只需为Firehose返回格式正确的行(通常只需添加processed_at时间戳并按原样返回输入行)。如果您不依赖于pandas,也可以直接在lambda中执行此操作,因为该库太大而无法将其打包到Lambda中(最大50MB)。 - bluu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接