使用AWS Lambda连接S3中的文件

5
有没有一种方法可以使用Lambda进行S3文件拼接?
我正在使用Firehose将数据流式传输到S3,采用最长的时间间隔(15分钟或128mb),因此每天会生成96个数据文件,但我希望将所有数据聚合到单个每日数据文件中,以便在稍后使用Spark(EMR)读取数据时获得最快的性能。
我创建了一个解决方案,其中当Firehose将新文件流式传输到S3时,会调用Lambda函数。然后该函数从源存储桶中读取(s3.GetObject)新文件和目标存储桶中的已连接每日数据文件(如果已存在,则连接之前的每日数据文件,否则创建一个新文件)。接着,将两个响应正文解码为字符串并将它们相加,然后使用s3.PutObject将其写入目标存储桶(覆盖之前的聚合文件)。
问题是,当聚合文件达到150+ MB时,Lambda函数在读取两个文件时就达到了其约1500MB的内存限制,然后失败了。
目前我只有少量数据,每天只有几百MB,但这个数量将来会呈指数级增长。对于我来说,Lambda的限制非常低,并且它们已经在处理如此小的文件时达到了极限,这很奇怪。
还有什么其他的方法可以拼接S3数据,最好是由S3对象创建事件调用或通过某种定期任务(例如每日定时)来调用?
2个回答

5

您可以使用计划事件创建一个仅每天调用一次的Lambda函数。在Lambda函数中,您应该使用Upload Part - Copy,它不需要下载Lambda函数上的文件。这个线程中已经有一个示例。


5

我建议您重新考虑是否真的想这样做:

  • S3成本将会增加。
  • 管道复杂度将会增加。
  • 从Firehose输入到Spark输入的延迟将会增加。
  • 如果单个文件注入到Spark失败(在分布式系统中这种情况很常见),您必须重新整理一个巨大的文件,可能需要对其进行分片(如果注入不是原子性的),再次上传,所有这些工作都可能需要很长时间。此时,您可能会发现恢复所需的时间太长,以至于必须推迟下一次注入...

相反,除非在该情况下不可能,否则请尽可能使Firehose文件并立即将它们发送到Spark:

  • 您可以几乎立即归档S3对象,降低成本。
  • 数据尽快在Spark中可用。
  • 如果将单个文件注入到Spark失败,则需要传输的数据量更少,如果您有自动恢复,则除非某些系统始终以全速运行(此时批量注入甚至更糟),否则几乎不会被注意到。
  • 建立TCP连接和身份验证会稍微增加一点延迟。

我不熟悉Spark,但通常这样的“管道”解决方案会涉及以下内容:

  • 定期触发器或(更好的是)Firehose输出桶上的事件侦听器,以尽快处理输入。
  • 注入器/转换器,以有效地将数据从S3移动到Spark。听起来Parquet可能会对此有所帮助。
  • 一个实时的Spark / EMR /底层数据服务实例,准备好接收数据。
  • 在底层数据服务的情况下,创建一个新的Spark集群以根据需要查询数据的某种方式。
当然,如果无法以合理的成本保持Spark数据处于准备就绪状态(但不可查询),这可能不是一个选择。注入小块数据可能非常耗时,但对于一个生产就绪的系统来说,这似乎不太可能发生。
如果您确实需要将数据分块成每日转储,您可以使用多部分上传。作为比较,我们从Firehose处理多个文件(每天多GB)的速度非常快,几乎没有额外的开销。

我的目标是创建更大的文件,这是基于我们之前设置了Firehose,使其每分钟(或1mb)写入一个新文件,但将该数据读入Spark花费了很多时间。对于每个数据约3GB的read.json,对于1个文件需要约15秒,对于小文件需要约10分钟(我现在不记得实例类型了)。我想使用Spark来完成两件事:统计和最终机器学习。对于统计,我想过滤和分组数据并获得不同的计数。我认为保持Spark EMR集群24/7运行会产生比S3更高的成本。 - V. Samma
1
我觉得我对大局仍然缺少一些理解。由于Lambda方案失败了,我考虑创建一个定时的Spark作业,从S3读取前一天的数据,并将其以Parquet格式写入另一个bucket中(这比读取普通json文件快)。但我仍然不知道你所说的立即将数据发送到Spark是什么意思。你是指Spark Streaming吗?这是否意味着要全天候运行EMR集群?我计划将所有数据保存在S3中,并为每个统计查询启动一个新的集群,读取数据,运行计算并将结果写入S3。 - V. Samma

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接