从S3读取ZIP文件而无需下载整个文件

16

我们有一些大小为5-10GB的ZIP文件。典型的ZIP文件有5-10个内部文件,每个文件未压缩时的大小为1-5GB。

我有一套很好的Python工具,可以读取这些文件。基本上,只要打开一个文件名,如果是ZIP文件,这些工具就会在ZIP文件中搜索并打开压缩文件。这一切都非常透明。

我想把这些文件作为压缩文件存储在Amazon S3中。我可以获取S3文件的范围,因此应该可以获取ZIP中央目录(它位于文件末尾,因此我只需读取最后64KiB),找到所需的组件,下载并直接流式传输到调用进程。

那么我的问题是,如何通过标准的Python ZipFile API实现这一点?文档中没有记录如何将文件系统传输替换为支持POSIX语义的任意对象。是否可以在不重写模块的情况下实现这一点?

4个回答

6

以下是一种无需获取整个文件的方法(完整版本可在这里找到)。

但它确实需要boto(或boto3),除非您可以通过AWS CLI模拟分段的GETs; 我想这也是很可能的。

import sys
import zlib
import zipfile
import io

import boto
from boto.s3.connection import OrdinaryCallingFormat


# range-fetches a S3 key
def fetch(key, start, len):
    end = start + len - 1
    return key.get_contents_as_string(headers={"Range": "bytes=%d-%d" % (start, end)})


# parses 2 or 4 little-endian bits into their corresponding integer value
def parse_int(bytes):
    val = ord(bytes[0]) + (ord(bytes[1]) << 8)
    if len(bytes) > 3:
        val += (ord(bytes[2]) << 16) + (ord(bytes[3]) << 24)
    return val


"""
bucket: name of the bucket
key:    path to zipfile inside bucket
entry:  pathname of zip entry to be retrieved (path/to/subdir/file.name)    
"""

# OrdinaryCallingFormat prevents certificate errors on bucket names with dots
# https://stackoverflow.com/questions/51604689/read-zip-files-from-amazon-s3-using-boto3-and-python#51605244
_bucket = boto.connect_s3(calling_format=OrdinaryCallingFormat()).get_bucket(bucket)
_key = _bucket.get_key(key)

# fetch the last 22 bytes (end-of-central-directory record; assuming the comment field is empty)
size = _key.size
eocd = fetch(_key, size - 22, 22)

# start offset and size of the central directory
cd_start = parse_int(eocd[16:20])
cd_size = parse_int(eocd[12:16])

# fetch central directory, append EOCD, and open as zipfile!
cd = fetch(_key, cd_start, cd_size)
zip = zipfile.ZipFile(io.BytesIO(cd + eocd))


for zi in zip.filelist:
    if zi.filename == entry:
        # local file header starting at file name length + file content
        # (so we can reliably skip file name and extra fields)

        # in our "mock" zipfile, `header_offset`s are negative (probably because the leading content is missing)
        # so we have to add to it the CD start offset (`cd_start`) to get the actual offset

        file_head = fetch(_key, cd_start + zi.header_offset + 26, 4)
        name_len = parse_int(file_head[0:2])
        extra_len = parse_int(file_head[2:4])

        content = fetch(_key, cd_start + zi.header_offset + 30 + name_len + extra_len, zi.compress_size)

        # now `content` has the file entry you were looking for!
        # you should probably decompress it in context before passing it to some other program

        if zi.compress_type == zipfile.ZIP_DEFLATED:
            print zlib.decompressobj(-15).decompress(content)
        else:
            print content
        break

在您的情况下,您可能需要将获取的内容写入本地文件(由于文件过大),除非内存使用不是一个问题。


你的方法直接解释了zipfile中央目录,但你可能没有实现Python本地zipfile实现的所有压缩类型。如果你看一下我的实现,我还实现了缓存,并使用aws cli进行范围查询(当我开始时我不知道它可以这样做,但它确实可以!) - vy32
@vy32 你说的在理论上是正确的;然而(至少在Ubuntu Linux上的Python 2.7中)ZipFile除了STOREDEFLATE之外不支持任何其他压缩方法(根据ZipFile文档(compression: ZIP_STORED(无压缩)或ZIP_DEFLATED(需要zlib))以及实现;事实上,我基于/usr/lib/python2.7/zipfile.py中的ZipFile源代码实现了我的解压逻辑:))(对于错过你已经使用范围查询的事实感到抱歉;看来在发布我的回答之前我没有仔细阅读你的回答!) - Janaka Bandara
事实上,我追踪了Python ZipFile的执行,并发现它是最优的,但我发现我必须添加缓存才能使性能达到合理水平。 - vy32
1
这太棒了。 - Daniel777
1
非常有帮助,但请注意它无法处理zip64,这意味着如果您有超过64K个条目,您将无法访问它们。我正在调整此问题,规范说明条目将在正常的中央目录之外。 - BrianY
显示剩余3条评论

3
所以这里是代码,允许您像打开普通文件一样打开Amazon S3上的文件。请注意,我使用aws命令,而不是boto3 Python模块。(我无法访问boto3。)您可以打开文件并在其上查找。文件会在本地缓存。如果您使用Python ZipFile API打开文件并且它是ZipFile,则可以读取各个部分。但是,您无法进行写入,因为S3不支持部分写入。

另外,我实现了s3open(),它可以打开要读取或写入的文件,但它没有实现ZipFile所需的查找接口。

from urllib.parse import urlparse
from subprocess import run,Popen,PIPE
import copy
import json
import os
import tempfile

# Tools for reading and write files from Amazon S3 without boto or boto3
# http://boto.cloudhackers.com/en/latest/s3_tut.html
# but it is easier to use the aws cli, since it's configured to work.

def s3open(path, mode="r", encoding=None):
    """
    Open an s3 file for reading or writing. Can handle any size, but cannot seek.
    We could use boto.
    http://boto.cloudhackers.com/en/latest/s3_tut.html
    but it is easier to use the aws cli, since it is present and more likely to work.
    """
    from subprocess import run,PIPE,Popen
    if "b" in mode:
        assert encoding == None
    else:
        if encoding==None:
            encoding="utf-8"
    assert 'a' not in mode
    assert '+' not in mode

    if "r" in mode:
        p = Popen(['aws','s3','cp',path,'-'],stdout=PIPE,encoding=encoding)
        return p.stdout

    elif "w" in mode:
        p = Popen(['aws','s3','cp','-',path],stdin=PIPE,encoding=encoding)
        return p.stdin
    else:
        raise RuntimeError("invalid mode:{}".format(mode))




CACHE_SIZE=4096                 # big enough for front and back caches
MAX_READ=65536*16
debug=False
class S3File:
    """Open an S3 file that can be seeked. This is done by caching to the local file system."""
    def __init__(self,name,mode='rb'):
        self.name   = name
        self.url    = urlparse(name)
        if self.url.scheme != 's3':
            raise RuntimeError("url scheme is {}; expecting s3".format(self.url.scheme))
        self.bucket = self.url.netloc
        self.key    = self.url.path[1:]
        self.fpos   = 0
        self.tf     = tempfile.NamedTemporaryFile()
        cmd = ['aws','s3api','list-objects','--bucket',self.bucket,'--prefix',self.key,'--output','json']
        data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
        file_info = data['Contents'][0]
        self.length = file_info['Size']
        self.ETag   = file_info['ETag']

        # Load the caches

        self.frontcache = self._readrange(0,CACHE_SIZE) # read the first 1024 bytes and get length of the file
        if self.length > CACHE_SIZE:
            self.backcache_start = self.length-CACHE_SIZE
            if debug: print("backcache starts at {}".format(self.backcache_start))
            self.backcache  = self._readrange(self.backcache_start,CACHE_SIZE)
        else:
            self.backcache  = None

    def _readrange(self,start,length):
        # This is gross; we copy everything to the named temporary file, rather than a pipe
        # because the pipes weren't showing up in /dev/fd/?
        # We probably want to cache also... That's coming
        cmd = ['aws','s3api','get-object','--bucket',self.bucket,'--key',self.key,'--output','json',
               '--range','bytes={}-{}'.format(start,start+length-1),self.tf.name]
        if debug:print(cmd)
        data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
        if debug:print(data)
        self.tf.seek(0)         # go to the beginning of the data just read
        return self.tf.read(length) # and read that much

    def __repr__(self):
        return "FakeFile<name:{} url:{}>".format(self.name,self.url)

    def read(self,length=-1):
        # If length==-1, figure out the max we can read to the end of the file
        if length==-1:
            length = min(MAX_READ, self.length - self.fpos + 1)

        if debug:
            print("read: fpos={}  length={}".format(self.fpos,length))
        # Can we satisfy from the front cache?
        if self.fpos < CACHE_SIZE and self.fpos+length < CACHE_SIZE:
            if debug:print("front cache")
            buf = self.frontcache[self.fpos:self.fpos+length]
            self.fpos += len(buf)
            if debug:print("return 1: buf=",buf)
            return buf

        # Can we satisfy from the back cache?
        if self.backcache and (self.length - CACHE_SIZE < self.fpos):
            if debug:print("back cache")
            buf = self.backcache[self.fpos - self.backcache_start:self.fpos - self.backcache_start + length]
            self.fpos += len(buf)
            if debug:print("return 2: buf=",buf)
            return buf

        buf = self._readrange(self.fpos, length)
        self.fpos += len(buf)
        if debug:print("return 3: buf=",buf)
        return buf

    def seek(self,offset,whence=0):
        if debug:print("seek({},{})".format(offset,whence))
        if whence==0:
            self.fpos = offset
        elif whence==1:
            self.fpos += offset
        elif whence==2:
            self.fpos = self.length + offset
        else:
            raise RuntimeError("whence={}".format(whence))
        if debug:print("   ={}  (self.length={})".format(self.fpos,self.length))

    def tell(self):
        return self.fpos

    def write(self):
        raise RuntimeError("Write not supported")

    def flush(self):
        raise RuntimeError("Flush not supported")

    def close(self):
        return

3

这是已有解决方案的改进版本 - 现在它使用boto3并处理大于4GiB的文件:

import boto3
import io
import struct
import zipfile

s3 = boto3.client('s3')

EOCD_RECORD_SIZE = 22
ZIP64_EOCD_RECORD_SIZE = 56
ZIP64_EOCD_LOCATOR_SIZE = 20

MAX_STANDARD_ZIP_SIZE = 4_294_967_295

def lambda_handler(event):
    bucket = event['bucket']
    key = event['key']
    zip_file = get_zip_file(bucket, key)
    print_zip_content(zip_file)

def get_zip_file(bucket, key):
    file_size = get_file_size(bucket, key)
    eocd_record = fetch(bucket, key, file_size - EOCD_RECORD_SIZE, EOCD_RECORD_SIZE)
    if file_size <= MAX_STANDARD_ZIP_SIZE:
        cd_start, cd_size = get_central_directory_metadata_from_eocd(eocd_record)
        central_directory = fetch(bucket, key, cd_start, cd_size)
        return zipfile.ZipFile(io.BytesIO(central_directory + eocd_record))
    else:
        zip64_eocd_record = fetch(bucket, key,
                                  file_size - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE + ZIP64_EOCD_RECORD_SIZE),
                                  ZIP64_EOCD_RECORD_SIZE)
        zip64_eocd_locator = fetch(bucket, key,
                                   file_size - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE),
                                   ZIP64_EOCD_LOCATOR_SIZE)
        cd_start, cd_size = get_central_directory_metadata_from_eocd64(zip64_eocd_record)
        central_directory = fetch(bucket, key, cd_start, cd_size)
        return zipfile.ZipFile(io.BytesIO(central_directory + zip64_eocd_record + zip64_eocd_locator + eocd_record))


def get_file_size(bucket, key):
    head_response = s3.head_object(Bucket=bucket, Key=key)
    return head_response['ContentLength']

def fetch(bucket, key, start, length):
    end = start + length - 1
    response = s3.get_object(Bucket=bucket, Key=key, Range="bytes=%d-%d" % (start, end))
    return response['Body'].read()

def get_central_directory_metadata_from_eocd(eocd):
    cd_size = parse_little_endian_to_int(eocd[12:16])
    cd_start = parse_little_endian_to_int(eocd[16:20])
    return cd_start, cd_size

def get_central_directory_metadata_from_eocd64(eocd64):
    cd_size = parse_little_endian_to_int(eocd64[40:48])
    cd_start = parse_little_endian_to_int(eocd64[48:56])
    return cd_start, cd_size

def parse_little_endian_to_int(little_endian_bytes):
    format_character = "i" if len(little_endian_bytes) == 4 else "q"
    return struct.unpack("<" + format_character, little_endian_bytes)[0]

def print_zip_content(zip_file):
    files = [zi.filename for zi in zip_file.filelist]
    print(f"Files: {files}")

谢谢。然而,你的解决方案编码了很多关于zipfile本身的知识,而我的解决方案将其封装在python的“zipfile”模块内部。我的解决方案还实现了缓存,我们发现这是实现显著性能所必需的。 - vy32
是的,它与您的解决方案不同。这是该SO问题https://dev59.com/GlUK5IYBdhLWcg3wuSFD#52455004中另一个解决方案的更新版本。 - kwiecien
我曾经在理解ZIP规范方面遇到了很多困难,所以我希望我的代码片段能够帮助到其他人 :) - kwiecien
你的代码片段很有用,ZIP规范在32位和64位ZIP文件方面很令人困惑。我的解决方案的优点是您不需要了解ZIP规范 - 规范的知识被封装到“zipfile”中。此外,“zipfile”模块经过充分测试,并且可能会保持最新状态。 - vy32
所以,如果我有zip_file = get_zip_file(bucket, key),为什么zip_file.extractall('.')不起作用?它会抛出ValueError: negative seek value -2661536 - alanwilter

0
import io


class S3File(io.RawIOBase):
    def __init__(self, s3_object):
        self.s3_object = s3_object
        self.position = 0

    def __repr__(self):
        return "<%s s3_object=%r>" % (type(self).__name__, self.s3_object)

    @property
    def size(self):
        return self.s3_object.content_length

    def tell(self):
        return self.position

    def seek(self, offset, whence=io.SEEK_SET):
        if whence == io.SEEK_SET:
            self.position = offset
        elif whence == io.SEEK_CUR:
            self.position += offset
        elif whence == io.SEEK_END:
            self.position = self.size + offset
        else:
            raise ValueError("invalid whence (%r, should be %d, %d, %d)" % (
                whence, io.SEEK_SET, io.SEEK_CUR, io.SEEK_END
            ))

        return self.position

    def seekable(self):
        return True

    def read(self, size=-1):
        if size == -1:
            # Read to the end of the file
            range_header = "bytes=%d-" % self.position
            self.seek(offset=0, whence=io.SEEK_END)
        else:
            new_position = self.position + size

            # If we're going to read beyond the end of the object, return
            # the entire object.
            if new_position >= self.size:
                return self.read()

            range_header = "bytes=%d-%d" % (self.position, new_position - 1)
            self.seek(offset=size, whence=io.SEEK_CUR)

        return self.s3_object.get(Range=range_header)["Body"].read()

    def readable(self):
        return True


if __name__ == "__main__":
    import zipfile

    import boto3

    s3 = boto3.resource("s3")
    s3_object = s3.Object(bucket_name="bukkit", key="bagit.zip")

    s3_file = S3File(s3_object)

    with zipfile.ZipFile(s3_file) as zf:
        print(zf.namelist())

参考:


这个代码非常简洁,但是它没有实现缓存。在你的经验中,这是否会成为一个问题? - vy32

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接