使用AWS S3 Java上传ZipOutputStream到S3,无需将Zip文件(大型)暂时保存到磁盘上。

14

我需要使用AWS S3 Java SDK从S3下载照片(不在同一个目录中),将它们打包成ZIP文件,然后再次上传到S3。这个ZIP文件的大小可能会达到几GB。我目前正在使用AWS Lambda,但是其临时存储有500MB的限制。因此,我不想将ZIP文件保存到磁盘上,而是希望直接将流式传输的ZIP文件(动态创建并使用从S3下载的照片)直接传输到S3。我需要使用AWS S3 Java SDK来实现这一点。


在这里回答了类似的问题。 - Eugene Kukhol
通常图像已经被压缩(*.bmp除外),您可以明确地添加不压缩这些文件。我很惊讶您想使用一个巨大的zip文件。自己的只读文件系统? - Joop Eggen
4个回答

4
基本思路是使用流操作。这样,您就不必等待ZIP在文件系统上生成,而是在ZIP算法生成任何数据时立即开始上传。显然,一些数据将在内存中缓冲,但仍无需等待整个ZIP在磁盘上生成。我们还将在两个线程中使用流组合和PipedInputStream / PipedOutputStream:一个用于读取数据,另一个用于压缩内容。
这里是的一个版本:
final AmazonS3 client = AmazonS3ClientBuilder.defaultClient();

final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);

final Thread s3In = new Thread(() -> {
    try (final ZipOutputStream zipOutputStream = new ZipOutputStream(pipedOutputStream)) {
        S3Objects
                // It's just a convenient way to list all the objects. Replace with you own logic.
                .inBucket(client, "bucket")
                .forEach((S3ObjectSummary objectSummary) -> {
                    try {
                        if (objectSummary.getKey().endsWith(".png")) {
                            System.out.println("Processing " + objectSummary.getKey());

                            final ZipEntry entry = new ZipEntry(
                                    UUID.randomUUID().toString() + ".png" // I'm too lazy to extract file name from the
                                    // objectSummary
                            );

                            zipOutputStream.putNextEntry(entry);

                            IOUtils.copy(
                                    client.getObject(
                                            objectSummary.getBucketName(),
                                            objectSummary.getKey()
                                    ).getObjectContent(),
                                    zipOutputStream
                            );

                            zipOutputStream.closeEntry();
                        }
                    } catch (final Exception all) {
                        all.printStackTrace();
                    }
                });
    } catch (final Exception all) {
        all.printStackTrace();
    }
});
final Thread s3Out = new Thread(() -> {
    try {
        client.putObject(
                "another-bucket",
                "previews.zip",
                pipedInputStream,
                new ObjectMetadata()
        );

        pipedInputStream.close();
    } catch (final Exception all) {
        all.printStackTrace();
    }
});

s3In.start();
s3Out.start();

s3In.join();
s3Out.join();

但请注意,它会打印一个警告:

WARNING: No content length specified for stream data.  Stream contents will be buffered in memory and could result in out of memory errors.

那是因为S3需要在上传之前提前知道数据的大小,而无法提前知道生成的ZIP文件大小。您可以尝试使用多部分上传, 但代码会更加棘手。不过,思路类似:一个线程应该读取数据并将内容发送到ZIP流中,另一个线程应该读取ZIP条目并将它们作为多部分上传。在所有条目(部分)上传完成后,应该完成多部分上传。以下是的示例:
final S3Client client = S3Client.create();

final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);

final Thread s3In = new Thread(() -> {
    try (final ZipOutputStream zipOutputStream = new ZipOutputStream(pipedOutputStream)) {
        client.listObjectsV2Paginator(
                ListObjectsV2Request
                        .builder()
                        .bucket("bucket")
                        .build()
        )
                .contents()
                .forEach((S3Object object) -> {
                    try {
                        if (object.key().endsWith(".png")) {
                            System.out.println("Processing " + object.key());

                            final ZipEntry entry = new ZipEntry(
                                    UUID.randomUUID().toString() + ".png" // I'm too lazy to extract file name from the object
                            );

                            zipOutputStream.putNextEntry(entry);

                            client.getObject(
                                    GetObjectRequest
                                            .builder()
                                            .bucket("bucket")
                                            .key(object.key())
                                            .build(),
                                    ResponseTransformer.toOutputStream(zipOutputStream)
                            );

                            zipOutputStream.closeEntry();
                        }
                    } catch (final Exception all) {
                        all.printStackTrace();
                    }
                });
    } catch (final Exception all) {
        all.printStackTrace();
    }
});
final Thread s3Out = new Thread(() -> {
    try {
        client.putObject(
                PutObjectRequest
                        .builder()
                        .bucket("another-bucket")
                        .key("previews.zip")
                        .build(),
                RequestBody.fromBytes(
                        IOUtils.toByteArray(pipedInputStream)
                )
        );
    } catch (final Exception all) {
        all.printStackTrace();
    }
});

s3In.start();
s3Out.start();

s3In.join();
s3Out.join();

它遭受同样的问题:上传前需要在内存中准备ZIP。

如果您感兴趣,我已经准备了一个演示项目,您可以玩转代码。


1
问题是AWS Java SDK for S3不支持流式写入OutputStream的方式。以下代码片段实现了一个'S3OutputStream',它继承自OutputStream并将自动执行'putObject'或'initiateMultipartUpload',具体取决于大小。这允许您将此S3OutputStream传递给ZipOutputStream的构造函数,例如new ZipOutputStream(new S3OutputStream(s3Client, "my_bucket", "path"))
import java.io.ByteArrayInputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3OutputStream extends OutputStream {

    private static final Logger LOG = LoggerFactory.getLogger(S3OutputStream.class);

    /** Default chunk size is 10MB */
    protected static final int BUFFER_SIZE = 10000000;

    /** The bucket-name on Amazon S3 */
    private final String bucket;

    /** The path (key) name within the bucket */
    private final String path;

    /** The temporary buffer used for storing the chunks */
    private final byte[] buf;

    /** The position in the buffer */
    private int position;

    /** Amazon S3 client. TODO: support KMS */
    private final AmazonS3 s3Client;

    /** The unique id for this upload */
    private String uploadId;

    /** Collection of the etags for the parts that have been uploaded */
    private final List<PartETag> etags;

    /** indicates whether the stream is still open / valid */
    private boolean open;

    /**
     * Creates a new S3 OutputStream
     * @param s3Client the AmazonS3 client
     * @param bucket name of the bucket
     * @param path path within the bucket
     */
    public S3OutputStream(AmazonS3 s3Client, String bucket, String path) {
        this.s3Client = s3Client;
        this.bucket = bucket;
        this.path = path;
        this.buf = new byte[BUFFER_SIZE];
        this.position = 0;
        this.etags = new ArrayList<>();
        this.open = true;
    }

    /**
     * Write an array to the S3 output stream.
     *
     * @param b the byte-array to append
     */
    @Override
    public void write(byte[] b) {
        write(b,0,b.length);
    }

    /**
     * Writes an array to the S3 Output Stream
     *
     * @param byteArray the array to write
     * @param o the offset into the array
     * @param l the number of bytes to write
     */
    @Override
    public void write(final byte[] byteArray, final int o, final int l) {
        this.assertOpen();
        int ofs = o, len = l;
        int size;
        while (len > (size = this.buf.length - position)) {
            System.arraycopy(byteArray, ofs, this.buf, this.position, size);
            this.position += size;
            flushBufferAndRewind();
            ofs += size;
            len -= size;
        }
        System.arraycopy(byteArray, ofs, this.buf, this.position, len);
        this.position += len;
    }

    /**
     * Flushes the buffer by uploading a part to S3.
     */
    @Override
    public synchronized void flush() {
        this.assertOpen();
        LOG.debug("Flush was called");
    }

    protected void flushBufferAndRewind() {
        if (uploadId == null) {
            LOG.debug("Starting a multipart upload for {}/{}",this.bucket,this.path);
            final InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(this.bucket, this.path)
                    .withCannedACL(CannedAccessControlList.BucketOwnerFullControl);
            InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(request);
            this.uploadId = initResponse.getUploadId();
        }
        uploadPart();
        this.position = 0;
    }

    protected void uploadPart() {
        LOG.debug("Uploading part {}",this.etags.size());
        UploadPartResult uploadResult = this.s3Client.uploadPart(new UploadPartRequest()
                .withBucketName(this.bucket)
                .withKey(this.path)
                .withUploadId(this.uploadId)
                .withInputStream(new ByteArrayInputStream(buf,0,this.position))
                .withPartNumber(this.etags.size() + 1)
                .withPartSize(this.position));
        this.etags.add(uploadResult.getPartETag());
    }

    @Override
    public void close() {
        if (this.open) {
            this.open = false;
            if (this.uploadId != null) {
                if (this.position > 0) {
                    uploadPart();
                }
                LOG.debug("Completing multipart");
                this.s3Client.completeMultipartUpload(new CompleteMultipartUploadRequest(bucket, path, uploadId, etags));
            }
            else {
                LOG.debug("Uploading object at once to {}/{}",this.bucket,this.path);
                final ObjectMetadata metadata = new ObjectMetadata();
                metadata.setContentLength(this.position);
                final PutObjectRequest request = new PutObjectRequest(this.bucket, this.path, new ByteArrayInputStream(this.buf, 0, this.position), metadata)
                        .withCannedAcl(CannedAccessControlList.BucketOwnerFullControl);
                this.s3Client.putObject(request);
            }
        }
    }

    public void cancel() {
        this.open = false;
        if (this.uploadId != null) {
            LOG.debug("Aborting multipart upload");
            this.s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(this.bucket, this.path, this.uploadId));
        }
    }

    @Override
    public void write(int b) {
        this.assertOpen();
        if (position >= this.buf.length) {
            flushBufferAndRewind();
        }
        this.buf[position++] = (byte)b;
    }

    private void assertOpen() {
        if (!this.open) {
            throw new IllegalStateException("Closed");
        }
    }
}


0

非常抱歉回复晚了,我昨天为我的最新项目完成了这个任务,请查看下面的完整代码

假设当我们在S3上上传文件时,它会返回已上传文件的ObjectKey,因此我创建了一个名为FileKey的类来处理此操作。

package com.myprojectName.model.key;

import java.time.Instant;

import javax.persistence.Entity;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Entity
@NoArgsConstructor
public class FileKey {

    private String fileObjectKey;
    
    private String fileName;
    
    private int fileSize;
    
    private String fileType;
    
}

我将presignedUrl的返回值存储在DownloadDetailsDTO中。

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.net.URL;

@NoArgsConstructor
@AllArgsConstructor
@Getter
@Builder
public class FileDownloadDetailsDTO {

    private String name;
    private Long size;
    private String contentType;
    private URL preSignedDownloadUrl;

    public FileDownloadDetailsDTO(PreSignedUrlAndMetadata objectMetadata) {
        this.name = objectMetadata.getName();
        this.size = objectMetadata.getSize();
        this.contentType = objectMetadata.getContentType();
        this.preSignedDownloadUrl = objectMetadata.getUrl();
    }

}

PreSignedUrlAndMetaData 包含在 s3 存储桶上创建的 URL,如果不确定,请查看下面的代码

public class PreSignedUrlAndMetadata {

private final URL url;

private final String name;

private final String contentType;

private final Long size;

}

以下方法将获取 S3 存储桶中的每个文件,并将其作为 zip 条目存储到 zip 文件中,最后返回 zip 文件的预签名 URL(无需在本地临时存储)。
 public FileDownloadDetailsDTO getDownloadFilesInZipDetails(String zipFileName, List<FileKey> files) {

        PreSignedUrlAndMetadata preSignedUrlAndMetadata;
        File zipFile = null;
        try {
            zipFile = File.createTempFile(zipFileName, "file");

            try (FileOutputStream fos = new FileOutputStream(zipFile); ZipOutputStream zos = new ZipOutputStream(fos)) {

                for (FileKey file : files) {
                    String name = null;
                    if (ObjectUtils.isNotEmpty(file.getFileName())) {
                        name = file.getFileName();
                    }
                        ZipEntry entry = new ZipEntry(name);

                    try (InputStream inputStream = getInputStreamForFileKey(file.getFileObjectKey())) {
                        zos.putNextEntry(entry);
                        IOUtils.copy(inputStream, zos);
                        zos.closeEntry();
                    }
                }
            }

            try (FileInputStream fis = new FileInputStream(zipFile)) {
                TempFileObjectKey fileObjectKey =uploadTemp(fis, zipFile.length());
                preSignedUrlAndMetadata = new PreSignedUrlAndMetadata(url, metadata.getUserMetaDataOf(USER_METADATA_NAME), contentType, metadata.getContentLength());
            }

        } catch (Exception e) {
            throw new ApplicationException("Error while creating zip file for " + archiveRequestDTO.getArchiveName(), e, ApplicationErrorCode.INTERNAL_SERVER_ERROR);
        } finally {
            FileUtils.deleteQuietly(zipFile);
        }

        return FileDownloadDetailsDTO.builder().name(archiveRequestDTO.getArchiveName() + ".zip")
                .size(preSignedUrlAndMetadata.getSize()).preSignedDownloadUrl(preSignedUrlAndMetadata.getUrl()).build();

    }

 public InputStream getInputStreamForFileKey(String key) {
    TempFileObjectKey tempFileObjectKey = new TempFileObjectKey(getActualPrefix(key));
    return storageService.getInputStream(tempFileObjectKey);
}


String getActualPrefix(String prefix){
    return prefix.replaceAll("_","/");
}

public TempFileObjectKey uploadTemp(InputStream inputStream, Long length) {
    TempFileObjectKey tempFileObjectKey = s3StorageManager.buildTempFileFullKey();
    ObjectMetadata objectMetadata = new ObjectMetadata();
    if (length != null) {
        objectMetadata.setContentLength(length);
    }
    Upload upload = com.amazonaws.services.s3.transfer.TransferManager.upload(getBucketName(abstractObjectKey), abstractObjectKey.getObjectKey(), inputStream, objectMetadata);
    try {
        upload.waitForCompletion();
    } catch (InterruptedException e) {
        throw new ApplicationException(e.getMessage(), e, ApplicationErrorCode.INTERNAL_SERVER_ERROR);
    }
    return tempFileObjectKey;
}

希望这对你们有所帮助。 如果还有任何疑问,欢迎向我提问。 谢谢。


如何通过Lambda解压缩存档文件并将其移动到同一存储桶中现有的键中,假设我在文件夹1中上传了一个zip文件,现在我的Lambda触发器会解压缩该文件,现在我想将其移动到同一存储桶中的另一个文件夹中,如何实现呢?因为S3遵循平面文件存储系统,没有文件夹的概念。 - jack

0

我已经创建了一个 GitHub 存储库,你可以在这里查看我的实现: https://github.com/yufeikang/serverless-zip-s3 该存储库包括一个使用 Node.js 编写的 Lambda 函数,你可以使用它来压缩 S3 目录,并提供了部署和使用该函数的说明。


如果您能添加一些与包结构和如何使用它相关的详细信息,那将是非常好的。用户可能不熟悉node.js或lambda。 - Kasid Khan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接