我可以在将数据交付到S3之前自定义Kinesis Firehose中的分区吗?

21

我有一个Firehose流,旨在从不同来源和不同事件类型中摄取数百万个事件。该流应将所有数据传递到一个S3存储桶中,作为未经处理的原始数据。

我考虑根据嵌入在事件消息中的元数据,如事件源、事件类型和事件日期,在S3中对这些数据进行分区。

然而,Firehose遵循其默认的基于记录到达时间的分区方式。是否可以自定义此分区行为以符合我的需求?

更新:已接受的答案已更新,因为有一个新答案表明,该功能从2021年9月开始提供。


@JohnRotenstein 很遗憾,这些答案并没有回答问题。两个建议都是附加一个lambda函数,根据特定的ID将传入的数据路由到不同的流中。这个问题和其他问题都在讨论是否可以为firehose定义分区方法。 谢谢您提供的参考资料!! - mowienay
6个回答

15

写这篇文章时,Vlad提到的动态分区功能仍然很新。我需要它成为CloudFormation模板的一部分,但它还没有得到适当的文档支持。我不得不添加DynamicPartitioningConfiguration才能使其正常工作。MetadataExtractionQuery语法也没有得到适当的文档支持。

  MyKinesisFirehoseStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    ...
    Properties:
      ExtendedS3DestinationConfiguration:
        Prefix: "clients/client_id=!{client_id}/dt=!{timestamp:yyyy-MM-dd}/"
        ErrorOutputPrefix: "errors/!{firehose:error-output-type}/"
        DynamicPartitioningConfiguration:
          Enabled: "true"
          RetryOptions:
            DurationInSeconds: "300"
        ProcessingConfiguration:
          Enabled: "true"
          Processors:
            - Type: AppendDelimiterToRecord
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: "{client_id:.client_id}"
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6


5
现在是2022年4月,我仍然找不到有关MetadataExtractionQuery的文档。这个答案真的很有帮助,让我弄清了语法。谢谢! - BjornO
这里有一些例子:https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisfirehose-deliverystream.html,但与此答案中的简单分区相比,它们相当不清晰。 - KC54
很高兴有一个可工作的示例!不过,我不得不写前缀:"clients/client_id=!{partitionKeyFromQuery:client_id}/dt=!{timestamp:yyyy-MM-dd}/",也许对其他人有用。 - Gabriele
很高兴有一个可工作的示例!不过,我不得不写前缀:"clients/client_id=!{partitionKeyFromQuery:client_id}/dt=!{timestamp:yyyy-MM-dd}/",也许对其他人有用。 - undefined
对于其他人无法立即弄清楚的情况:UI界面中的“动态分区键”对应于模板中的“ProcessingConfiguration”。 - Gabriele
对于其他人无法立即弄清楚的情况:UI中的“动态分区键”对应于模板中的“ProcessingConfiguration”。 - undefined

12
自2021年9月1日起,AWS Kinesis Firehose支持此功能。请在此处阅读发布的公告博客文章。
从文档中可以看到:
您可以使用“键”和“值”字段来指定要用作动态分区键的数据记录参数以及生成动态分区键值的jq查询。...
以下是UI中的样子: enter image description here enter image description here

这太棒了! - Dmitry

6

不行,你不能基于事件内容进行“分区”。

一些选项是:

  • 发送到单独的Firehose流
  • 发送到Kinesis数据流(而不是Firehose),并编写自己的自定义Lambda函数来处理和保存数据(参见:AWS开发者论坛:Athena和Kinesis Firehose
  • 使用Kinesis Analytics处理消息并将其“直接”发送到不同的Firehose流

如果您要将输出与Amazon Athena或Amazon EMR一起使用,则还可以考虑将其转换为Parquet格式,这具有更好的性能。这需要对S3中的数据进行批处理后处理,而不是在流中到达时转换数据。


3
为了补充Murali的回答,我们已经在CDK中实现了它:
我们传入的JSON数据大致如下:
{
    "data": 
        {
        "timestamp":1633521266990,
        "defaultTopic":"Topic",
        "data":
        {
            "OUT1":"Inactive",
            "Current_mA":3.92
        }
    }
}

CDK代码如下所示:
const DeliveryStream = new CfnDeliveryStream(this, 'deliverystream', {
  deliveryStreamName: 'deliverystream',
  extendedS3DestinationConfiguration: {
    cloudWatchLoggingOptions: {
      enabled: true,
    },
    bucketArn: Bucket.bucketArn,
    roleArn: deliveryStreamRole.roleArn,
    prefix: 'defaultTopic=!{partitionKeyFromQuery:defaultTopic}/!{timestamp:yyyy/MM/dd}/',
    errorOutputPrefix: 'error/!{firehose:error-output-type}/',
    bufferingHints: {
      intervalInSeconds: 60,
    },
    dynamicPartitioningConfiguration: {
      enabled: true,
    },
    processingConfiguration: {
      enabled: true,
      processors: [
        {
          type: 'MetadataExtraction',
          parameters: [
            {
              parameterName: 'MetadataExtractionQuery',
              parameterValue: '{Topic: .data.defaultTopic}',
            },
            {
              parameterName: 'JsonParsingEngine',
              parameterValue: 'JQ-1.6',
            },
          ],
        },
        {
          type: 'AppendDelimiterToRecord',
          parameters: [
            {
              parameterName: 'Delimiter',
              parameterValue: '\\n',
            },
          ],
        },
      ],
    },
  },
})

你知道如何将2个字段用作2个单独的值吗? - ArielB
3
带有两个字段的示例参数: { parameterName: 'MetadataExtractionQuery', parameterValue: '{Topic:.data.defaultTopic,out1:.data.data.OUT1}', } 该参数的含义是:元数据提取查询。其中,'parameterName'为参数名称,'parameterValue'为参数值。参数值中包含两个字段,即'Topic'和'out1',并对应了相应的值。':.data.defaultTopic'和':.data.data.OUT1'表示这些值来自于其他数据源。 - MikA
1
谢谢,@MikA!我认为这是网络上唯一有记录的地方。 - sambol

3
建立在John的答案上,如果您不需要几乎实时的流媒体要求,我们发现使用Athena进行批处理是对我们来说一个简单的解决方案。
Kinesis将流传输到给定的表“unpartitioned_event_data”,该表可以利用原生记录到达时间分区。
我们定义另一个Athena表“partitioned_event_table”,该表可以定义自定义分区键,并利用Athena具有的“INSERT INTO”功能。 Athena将自动以所需格式重新分区数据,无需任何自定义消费者或新基础设施来管理。 这可以使用cron,SNS或类似Airflow的工具进行调度。
很酷的是,您可以创建一个视图,对两个表进行UNION查询,以在一个地方查询历史和实时数据。
我们实际上在雷达公司处理了这个问题,并且在此博客文章中谈到了更多的权衡

1

我的场景是:

Firehose需要将数据发送到与Glue表绑定的S3,使用Parquet格式,并启用动态分区,因为我想从我推送到Firehose的数据中考虑年、月和日,而不是默认值。

以下是可行的代码:

  rawdataFirehose:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Join ["-", [rawdata, !Ref AWS::StackName]]
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt rawdataS3bucket.Arn
        Prefix: parquetdata/year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 128
        ErrorOutputPrefix: errors/
        RoleARN: !GetAtt FirehoseRole.Arn
        DynamicPartitioningConfiguration:
          Enabled: true
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: "{year:.year,month:.month,day:.day}"
                - ParameterName: "JsonParsingEngine"
                  ParameterValue: "JQ-1.6"
        DataFormatConversionConfiguration:
          Enabled: true
          InputFormatConfiguration:
            Deserializer:
              HiveJsonSerDe: {}
          OutputFormatConfiguration:
            Serializer:
              ParquetSerDe: {}
          SchemaConfiguration:
            CatalogId: !Ref AWS::AccountId
            RoleARN: !GetAtt FirehoseRole.Arn
            DatabaseName: !Ref rawDataDB
            TableName: !Ref rawDataTable
            Region:
              Fn::ImportValue: AWSRegion
            VersionId: LATEST

  FirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: !Sub firehose-glue-${Envname}
          PolicyDocument: |
            {
              "Version": "2012-10-17",
              "Statement":
                [
                  {
                    "Effect": "Allow",
                    "Action":
                      [
                        "glue:*",
                        "iam:ListRolePolicies",
                        "iam:GetRole",
                        "iam:GetRolePolicy",
                        "tag:GetResources",
                        "s3:*",
                        "cloudwatch:*",
                        "ssm:*"
                      ],
                    "Resource": "*"
                  }
                ]
            }

注意:
rawDataDB 是指 Glue 数据库的引用
rawDataTable 是指表格的引用
rawdataS3bucket 是指 S3 存储桶的引用

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接