Firehose->S3使用当前日期作为在S3中创建密钥的前缀。因此,这将按记录编写时间对数据进行分区。我的firehose流包含具有特定事件时间的事件。
是否有一种方法可以创建包含此事件时间的S3键? 下游处理工具依赖于每个事件在与其实际发生时间相关的“小时文件夹”中。或者那是否需要在 Firehose 完成后进行额外的处理步骤?
事件时间可以在分区键中,或者我可以使用Lambda函数从记录中解析它。
Firehose->S3使用当前日期作为在S3中创建密钥的前缀。因此,这将按记录编写时间对数据进行分区。我的firehose流包含具有特定事件时间的事件。
是否有一种方法可以创建包含此事件时间的S3键? 下游处理工具依赖于每个事件在与其实际发生时间相关的“小时文件夹”中。或者那是否需要在 Firehose 完成后进行额外的处理步骤?
事件时间可以在分区键中,或者我可以使用Lambda函数从记录中解析它。
目前,Kinesis Firehose还不允许客户控制最终S3对象的日期后缀是如何生成的。
您唯一的选择是在Kinesis Firehose之后添加一个后处理层。例如,您可以使用Data Pipeline安排一个每小时的EMR作业,读取上一个小时内写入的所有文件并将它们发布到正确的S3目标。
您需要进行一些后处理或编写自定义流式消费者(例如Lambda)来执行此操作。
在我们公司处理了大量事件之后,编写Lambda函数似乎不是一个很好的资金利用方式。相反,我们发现使用Athena进行批处理是一个非常简单的解决方案。
首先,您要将数据流式传输到Athena表格events
中,可以选择按到达时间分区。
然后,您定义另一个Athena表格,比如events_by_event_time
,它根据事件中的event_time
属性进行分区,或者根据模式中定义的任何其他属性进行分区。
最后,您安排一个进程运行Athena INSERT INTO查询,该查询从events
获取事件,并自动重新对其进行分区以使其符合events_by_event_time
的分区方式,现在您的事件已经按event_time
进行分区,无需使用EMR、数据管道或任何其他基础架构。
对于未来的读者 - Firehose支持Amazon S3对象的自定义前缀
https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
Dynamic partitioning enables you to continuously partition streaming data in Kinesis Data Firehose by using keys within data (for example, customer_id or transaction_id) and then deliver the data grouped by these keys into corresponding Amazon Simple Storage Service (Amazon S3) prefixes.
https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
Kinesis Data Firehose使用正在写入的Amazon S3对象中包含的最旧记录的近似到达时间戳
。 - jso