按事件时间对Kinesis Firehose S3记录进行分区

11

Firehose->S3使用当前日期作为在S3中创建密钥的前缀。因此,这将按记录编写时间对数据进行分区。我的firehose流包含具有特定事件时间的事件。

是否有一种方法可以创建包含此事件时间的S3键? 下游处理工具依赖于每个事件在与其实际发生时间相关的“小时文件夹”中。或者那是否需要在 Firehose 完成后进行额外的处理步骤?

事件时间可以在分区键中,或者我可以使用Lambda函数从记录中解析它。

6个回答

9

目前,Kinesis Firehose还不允许客户控制最终S3对象的日期后缀是如何生成的。

您唯一的选择是在Kinesis Firehose之后添加一个后处理层。例如,您可以使用Data Pipeline安排一个每小时的EMR作业,读取上一个小时内写入的所有文件并将它们发布到正确的S3目标。


1

您需要进行一些后处理或编写自定义流式消费者(例如Lambda)来执行此操作。

在我们公司处理了大量事件之后,编写Lambda函数似乎不是一个很好的资金利用方式。相反,我们发现使用Athena进行批处理是一个非常简单的解决方案。

首先,您要将数据流式传输到Athena表格events中,可以选择按到达时间分区

然后,您定义另一个Athena表格,比如events_by_event_time,它根据事件中的event_time属性进行分区,或者根据模式中定义的任何其他属性进行分区。

最后,您安排一个进程运行Athena INSERT INTO查询,该查询从events获取事件,并自动重新对其进行分区以使其符合events_by_event_time的分区方式,现在您的事件已经按event_time进行分区,无需使用EMR、数据管道或任何其他基础架构。

你可以对事件上的任何属性进行此操作。值得注意的是,你可以创建一个视图,对两个表执行UNION以查询实时和历史事件。
我在这里的博客文章中详细介绍了这个问题。

1
这不是对问题的回答,但我想解释一下按照事件到达时间存储记录背后的思路。
首先,我们来谈谈流。Kinesis只是数据流。它有一个消费的概念。只有通过顺序读取才能可靠地消费流。还有一种检查点的概念,作为暂停和恢复消费过程的机制。检查点只是标识流中位置的序列号。通过指定此数字,可以从特定事件开始读取流。
现在回到默认的S3 Firehose设置......由于Kinesis流的容量相当有限,最可能需要将数据从Kinesis存储到某个地方以便以后分析。而“Firehose到S3设置”可以立即完成此操作。它只是将流中的原始数据存储到S3存储桶中。但逻辑上,这些数据仍然是相同的记录流。要能够可靠地消费(读取)此流,需要这些检查点的顺序号码。这些数字就是记录到达时间。
如果我想按创建时间读取记录怎么办?看起来完成这个任务的正确方法是按顺序读取s3流,将其转储到某个[时间序列]数据库或数据仓库中,并针对此存储执行基于创建时间的读取。否则,在读取s3(流)时始终存在错过一些事件批次的非零机会。因此,我不建议重新排序s3桶。

0

这并没有回答问题。问题是关于“事件时间”,即事件中的时间字段。Firehose仅支持“处理时间”,请参见:Kinesis Data Firehose使用正在写入的Amazon S3对象中包含的最旧记录的近似到达时间戳 - jso

0

0
AWS于2021年8月开始提供“动态分区”。
Dynamic partitioning enables you to continuously partition streaming data in Kinesis Data Firehose by using keys within data (for example, customer_id or transaction_id) and then deliver the data grouped by these keys into corresponding Amazon Simple Storage Service (Amazon S3) prefixes.

https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接