当使用AWS Firehose时,如何合并s3文件?

15

我有一个AWS Kinesis Firehose流,使用以下配置将数据放入s3:

S3 buffer size (MB)*       2
S3 buffer interval (sec)*  60

一切都正常工作。唯一的问题是Firehose为每个数据块创建一个s3文件。(在我的情况下,如屏幕截图所示,每分钟创建一个文件)。随着时间的推移,这将产生大量文件:每天1440个文件,每年525k个文件。

enter image description here

这很难管理(例如,如果我想将存储桶复制到另一个存储桶,我需要逐个复制每个文件,这将需要时间)。

两个问题:

  • 有没有办法告诉Kinesis将旧文件分组/合并在一起。(例如,24小时之前的文件被分组成一天的数据块)。
  • 当从大量s3文件中COPY而不仅仅是几个文件时,如何影响COPY redshift性能?我没有精确测量过,但根据我的经验,大量小文件的性能相当差。从我记得的情况来看,当使用大文件时,大约2M行的COPY约为~1分钟。具有许多小文件(〜11k个文件)的2M行,则需要多达30分钟。

我关注的两个主要问题是:

  • 更好的redshift COPY性能(来自s3)
  • 更轻松的整体s3文件管理(备份,任何类型的操作)
4个回答

8
你最简单的解决方法是增加firehose缓冲区大小和时间限制 - 你可以增加到15分钟,这将把每天1440个文件减少到每天96个文件(当然如果你达到了文件大小限制则不成立)。
除此之外,在Kinesis中没有任何可以为你连接文件的功能,但你可以设置一个S3生命周期事件,每当创建新的Kinesis文件时触发,并添加一些你自己的代码(也许运行在EC2上或使用Lambda无服务器),以便自行进行连接操作。
无法评价redshift的加载性能,但我认为这不是个大问题,如果是的话,我相信AWS会对性能进行优化,因为这是他们设置的使用模式。

2
我遇到了类似的问题,文件数量太多难以处理。这里有一个可以有用的解决方法:
i)将缓冲区大小增加到最大值(128 MB);
ii)将时间缓冲区增加到最大值(900秒);
iii)不要每次只发布一条记录,将多个记录组合在一起(通过分隔符分隔),形成一个Kinesis Firehose记录(KF记录的最大大小为:1000 KB);
iv)同时,将多个Kinesis Firehose记录组合成批次,然后进行批量插入(http://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html)。
这将使发布到S3的一个对象成为:Kinesis Firehose流可以容纳的批次数。
希望这能有所帮助。

是的。不幸的是,在我的情况下,我希望记录能够快速交付。我不能等待900秒,因为我想要半实时的新鲜数据。因此,我正在考虑一种解决方案,即将所有数据加载到Redshift中,然后在单个(或仅几个)S3文件中一次性卸载所有内容。 - Benjamin Crouzier
另一个适合使用情况的想法: i)在您的S3存储桶上设置AWS Lambda。 ii)根据您的需要保留AWS Kinesis Firehose流设置。 iii)因此,将有太多文件,如问题所述。 iv)现在,每当发布到存储桶时,Lambda函数应该触发,将多个文件合并成一个并将其放入不同的存储桶中。如果您不想将其放入不同的存储桶中,则可以将其放入具有不同前缀的相同存储桶中,以便不再触发Lambda函数。这将更简单。 - psychorama

2
Kinesis Firehose旨在实现事件的准实时处理。它被优化用于此类用例,因此您可以设置较小且更频繁的文件。这样一来,您将更快地获取数据以供在Redshift中查询,或更频繁地在较小的文件上调用Lambda函数。
服务客户通常也会为更长时间的历史查询准备数据。即使可以在Redshift上运行这些长期查询,使用EMR进行这些查询可能是有意义的。然后,您可以将Redshift集群保持调整,以用于更受欢迎的最近事件(例如,在SSD上为3个月的“热点”集群和在HDD上为1年的“冷”集群)。
您可以采用Firehose输出S3存储桶中较小(未压缩?)的文件,并将它们转移到更适合EMR(Hadoop / Spark / Presto)的格式。您可以使用诸如S3DistCp 之类的服务,或类似的函数,该函数将获取较小的文件,将其连接并将其格式转换为Parquet格式。
关于针对Redshift COPY的优化,存在聚合事件的时间和复制时间之间的平衡。确实,在将文件复制到Redshift时,最好拥有较大的文件,因为每个文件都有一定的开销。但另一方面,如果您仅每15分钟复制一次数据,则可能会有“空闲”时间,在这些复制命令之间您不使用网络或群集摄取事件的能力。您应该找到适合业务(您需要多新鲜的事件)和技术方面(您可以在一个小时/天内摄取多少事件到Redshift中)的平衡点。

0

我真的很喜欢@psychorama提出的这个解决方案。实际上,我可以在我的项目中使用相同的方法,在我准备放弃firehose解决方案的情况下。由于我正在从dynamodb读取数据并将它们放入kinesis firehose中,我实际上可以将来自dynamodb的整个批处理数据合并为一条记录,并带有限制,然后将其发送到firehose。但不确定这个解决方案是否容易实现。也许在第二个版本中。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接