如何使用AWS Lambda处理大型zip文件?

4

我想从s3中读取10万个以上的文件,并将它们压缩成一个大文件。单个文件大小范围从几KB到1MB不等,最终的zip文件可能轻松超过3GB。

考虑到AWS Lambda有3GB的内存限制和512MB的tmp目录存储限制,您如何使用AWS Lambda以.NET Core 3编写代码来实现这一点?

当zip文件大小超过3GB时,下面的代码将会失败。

    var zipStream = new MemoryStream();
    using (System.IO.Compression.ZipArchive zip = new ZipArchive(zipStream, ZipArchiveMode.Create, true))
    {
        for(int i =0;i<sourceFils.Count;i++)
        {
            
            var zipItem = zip.CreateEntry("file"+i.ToString()+".pdf");

            using (var entryStream = zipItem.Open())
            {
                var source  = GetFileFromS3(sourceFiles[i]);
                await source.CopyToAsync(entryStream);
            }
        }
    }

     //upload zip file to S3. For brevity Upload code is not included.
    _s3Client.Upload(zipStream); 

我看到的大多数大文件处理示例都使用Node JS,并且不超过3GB。我正在寻找C# .Net Core示例。我也试图避免将zip拆分成小于3GB的多个zip文件。

1> 如何在AWS Lambda中处理大文件而无需拆分zip文件? 2> 是否有S3流可用以直接从S3读/写?

1>如何在AWS Lambda上处理大文件而无需拆分zip文件? 2>是否有可用的S3 Stream,可以直接从S3读取/写入数据?


1
如果您的总存储空间为3GB(RAM)+ 0.5GB(磁盘),显然您无法超过3.5GB(可能还不到这个值)。您需要更多的空间-> AWS Lambda不适合此任务。 - Camilo Terevinto
当内存不足时,您可以使用交换空间,它将使用临时文件代替内存。它运行速度较慢,但可以解决问题。请参见:https://www.reddit.com/r/aws/comments/b2zijf/swap_space_when_using_aws_linux_based_ami/ - jdweng
@jdweng 我认为这只适用于 EC2 实例,而不适用于无服务器 Lambda。不过我可能错了。 - LP13
@LP13:如果机器有文件系统(智能卡),那么它是适用的。这与无服务器无关。 - jdweng
Lambda是事件驱动的。如果您只需要执行一次,那么最简单的方法是启动一个具有合理磁盘空间的EC2实例,运行程序,然后删除EC2实例。如果您需要频繁执行,则创建一个带有正确“内容”的EC2 AMI,并使用它创建临时EC2实例并根据需要运行它。 - stdunbar
如果您需要在文件上传时做出反应,则可以将AWS Batch作业用作EventBridge(以前称为CloudWatch事件)事件目标 https://docs.aws.amazon.com/batch/latest/userguide/batch-cwe-target.html。 - jimmone
2个回答

0

从今天开始(2020年12月1日),您可以分配高达10 GB的内存。这可能足够满足您的需求,至少目前是这样。https://aws.amazon.com/blogs/aws/new-for-aws-lambda-functions-with-up-to-10-gb-of-memory-and-6-vcpus/

如果您可以调整代码以避免要求将所有内容都存储在内存中,则另一个选择可能是利用Amazon EFS进行存储。Lambda的EFS支持于今年早些时候推出。https://aws.amazon.com/blogs/compute/using-amazon-efs-for-aws-lambda-in-your-serverless-applications/


0

对于大型上传,您需要使用S3多部分上传功能。通过一点点的工作,您可以编写一个包装器流,将每个最小上传部分(5MB)缓冲到MemoryStream中,并将它们作为部分刷新。

这是我为这个特定情况编写的代码:

using System.IO.Compression;

using Microsoft.Extensions.Logging;

using Amazon.S3;
using Amazon.S3.Model;

using Atelie.Dev.Utils;

var s3Client = new AmazonS3Client();
var req = new InitiateMultipartUploadRequest { BucketName = "...", Key = "...", ContentType = "application/zip" };
var res = await s3Client.InitiateMultipartUploadAsync(req);

var loggerFactory = new LoggerFactory();
var logger = loggerFactory.CreateLogger<S3MultipartUploadStream>();

await using (var upload = new S3MultipartUploadStream(s3Client, res, logger))
{
    // the leaveOpen is mandatory, you don't want to let ZipArchive close our stream
    using (var archive = new ZipArchive(upload, ZipArchiveMode.Create, leaveOpen: true))
    {
        var ze = archive.CreateEntry("...");
        await using var es = ze.Open();
        // write what you need to es
    }

    await upload.CompleteUploadAsync();
}

以下是 S3MultipartUploadStream 类的代码(注意,这里可能有风险):

using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Amazon.S3;
using Amazon.S3.Model;

using Microsoft.Extensions.Logging;

namespace Atelie.Dev.Utils;

public class S3MultipartUploadStream : Stream
{
    private const int MinPartSize = 5 * 1024 * 1024; // 5MB
    private const int BufferCapacity = MinPartSize;

    private readonly IAmazonS3 _s3Client;
    private readonly InitiateMultipartUploadResponse _mur;
    private readonly ILogger<S3MultipartUploadStream> _logger;
    private readonly ConcurrentBag<PartETag> _parts = new();
    private readonly ConcurrentBag<Task> _uploadTasks = new();
    private readonly CancellationTokenSource _cts = new();

    private readonly int _concurrency;

    private volatile MemoryStream _mem = new(BufferCapacity);
    private Task? _completedTask;
    private long _realPosition;
    private int _partNumber;

    public S3MultipartUploadStream(IAmazonS3 s3Client, InitiateMultipartUploadResponse mur, ILogger<S3MultipartUploadStream> logger, int concurrency = 4)
    {
        _s3Client = s3Client;
        _mur = mur;
        _logger = logger;
        _concurrency = concurrency;
    }

    /// <summary>
    /// Finishes uploading the pending parts, and marks the multipart upload as completed.
    /// If you don't call this method, the upload will be aborted when calling <see cref="Dispose"/>.
    /// </summary>
    public Task CompleteUploadAsync()
    {
        if (_completedTask != null)
            throw new InvalidOperationException("Upload already finished or aborted");

        return _completedTask = Task.Run(async () =>
        {
            if (_mem.Position > 0)
                await UploadPart(true);

            // waits for all upload tasks to finish
            Task.WaitAll(_uploadTasks.ToArray());

            var req = new CompleteMultipartUploadRequest
            {
                BucketName = _mur.BucketName,
                Key = _mur.Key,
                UploadId = _mur.UploadId,
                PartETags = _parts.ToList(),
            };
            _logger.LogInformation("Finished upload of {Count} parts", req.PartETags.Count);

            await _s3Client.CompleteMultipartUploadAsync(req);
        });
    }

    /// <summary>
    /// Aborts the upload and its parts. Called automatically if <see cref="Dispose"/> is called before
    /// <see cref="CompleteUploadAsync"/>.
    /// </summary>
    private Task AbortUploadAsync()
    {
        return _completedTask ??= Task.Run(async () =>
        {
            // cancels any running upload
            _cts.Cancel();

            var req = new AbortMultipartUploadRequest
            {
                BucketName = _mur.BucketName,
                Key = _mur.Key,
                UploadId = _mur.UploadId,
            };
            _logger.LogInformation("Aborting upload of: {Key}", req.Key);

            await _s3Client.AbortMultipartUploadAsync(req);
        });
    }

    /// <summary>
    /// Uploads the current MemoryStream asynchronously, and replaces the MemoryStream for a pristine one.
    /// </summary>
    private async Task UploadPart(bool isLastPart)
    {
        // if the current position is zero, we don't need to perform the upload
        if (_mem.Position == 0)
            return;

        if (_mem.Length < MinPartSize && !isLastPart)
            throw new InvalidOperationException("You can't upload a part smaller than 5MB, unless it's the last one");

        // if we're over the configured concurrency, wait
        while (_uploadTasks.Count(t => !t.IsCompleted) >= _concurrency)
            await Task.Delay(100, _cts.Token);

        // sets the part number and makes a copy of the stream to upload
        var partNumber = Interlocked.Increment(ref _partNumber);
        var msToWrite = _mem;

        // creates the new stream for the next part
        _mem = new MemoryStream(BufferCapacity);

        // starts the upload task
        _uploadTasks.Add(Task.Run(async () =>
        {
            await using (msToWrite)
            {
                msToWrite.Position = 0;
                var req = new UploadPartRequest
                {
                    BucketName = _mur.BucketName,
                    Key = _mur.Key,
                    InputStream = msToWrite,
                    PartNumber = partNumber,
                    PartSize = msToWrite.Length,
                    UploadId = _mur.UploadId,
                    IsLastPart = isLastPart,
                };
                _logger.LogInformation("Part {Seq}: staring upload of {Size} bytes", req.PartNumber, req.PartSize);

                var res = await _s3Client.UploadPartAsync(req, _cts.Token);
                _parts.Add(new PartETag(req.PartNumber, res.ETag));

                _logger.LogInformation("Part {Seq}: upload finished", res.PartNumber);
            }
        }));
    }

    #region Dispose and DisposeAsync

    protected override void Dispose(bool disposing)
    {
        var endTask = AbortUploadAsync();
        endTask.GetAwaiter().GetResult();
        endTask.Dispose();

        _mem.Dispose();
        _cts.Dispose();

        base.Dispose(disposing);
    }

    public override async ValueTask DisposeAsync()
    {
        GC.SuppressFinalize(this);

        var endTask = AbortUploadAsync();

        await Task.WhenAll(_mem.DisposeAsync().AsTask(), endTask, base.DisposeAsync().AsTask());

        endTask.Dispose();
        _cts.Dispose();

        await base.DisposeAsync();
    }

    #endregion

    #region Write and WriteAsync

    public override void Write(byte[] buffer, int offset, int count)
    {
        while (count > 0)
        {
            var limit = _mem.Capacity - (int) _mem.Position;

            if (limit == 0)
                UploadPart(false).GetAwaiter().GetResult();
            else
            {
                var written = Math.Min(limit, count);
                _mem.Write(buffer, offset, written);
                offset += written;
                count -= written;
                Interlocked.Add(ref _realPosition, written);
            }
        }
    }

    public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
    }

    public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
    {
        var unwrittenBytes = buffer.Length;
        var offset = 0;
        while (unwrittenBytes > 0)
        {
            var limit = _mem.Capacity - (int) _mem.Position;

            if (limit == 0)
                await UploadPart(false);
            else
            {
                var currentSliceSize = Math.Min(limit, unwrittenBytes);
                await _mem.WriteAsync(buffer.Slice(offset, currentSliceSize), cancellationToken);
                offset += currentSliceSize;
                unwrittenBytes -= currentSliceSize;
                Interlocked.Add(ref _realPosition, currentSliceSize);
            }
        }
    }

    #endregion

    #region Remaining of Stream implementation

    public override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
    public override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException();
    public override void SetLength(long value) => throw new InvalidOperationException();

    public override void Flush() {}

    public override bool CanRead => false;
    public override bool CanSeek => false;
    public override bool CanWrite => true;
    public override long Length => throw new InvalidOperationException();

    public override long Position
    {
        get => _realPosition;
        set => throw new InvalidOperationException();
    }

    #endregion
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接