用Python从生成器创建zip文件?

25

我有大量数据(几个G)需要在Python中写入zip文件。我无法一次性将所有数据加载到内存中,然后传递给ZipFile的.writestr方法,并且我真的不想使用临时文件将它全部写入磁盘,然后再重新读取。

有没有一种方法可以将生成器或类似文件的对象提供给ZipFile库?或者有没有某种原因不支持这种功能?

“zip文件”指的是zip文件,在Python的zipfile包中得到支持。


1
我在标题和第一句话中都说了这句话。我已经添加了澄清,尽管我不明白为什么需要这样做。如果我只需要任何通用的压缩算法,我一开始就会这么说的。 - Chris B.
似乎在大多数情况下,ZIP指的是GZIP。因此,当你说ZIP(例如PKWare ZIP)时,必须澄清区别。是的,人们为什么会把PKWare Zip和GZip混淆是令人困惑的。 - S.Lott
我猜,最为人所知的压缩应用程序WinZip、PKWARE Zip和7zip的“gzip支持”可能会让人们认为gzip的实现是不费力的。 - altunyurt
13个回答

13

唯一的解决方案是重写用于压缩文件的方法,以从缓冲区中读取。将其添加到标准库中将非常简单;我很惊讶它还没有被完成。我了解到有很多人都同意需要彻底改进整个接口,这似乎阻碍了任何渐进式的改进。

import zipfile, zlib, binascii, struct
class BufferedZipFile(zipfile.ZipFile):
    def writebuffered(self, zipinfo, buffer):
        zinfo = zipinfo

        zinfo.file_size = file_size = 0
        zinfo.flag_bits = 0x00
        zinfo.header_offset = self.fp.tell()

        self._writecheck(zinfo)
        self._didModify = True

        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        self.fp.write(zinfo.FileHeader())
        if zinfo.compress_type == zipfile.ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None

        while True:
            buf = buffer.read(1024 * 8)
            if not buf:
                break

            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC) & 0xffffffff
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)

            self.fp.write(buf)

        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        position = self.fp.tell()
        self.fp.seek(zinfo.header_offset + 14, 0)
        self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
        self.fp.seek(position, 0)
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo

请查看我的答案,它进一步采用了这种方法:https://dev59.com/aXVC5IYBdhLWcg3wbghT#2734156 - haridsv
1
提供的代码中有一个错误;struct.packfmt参数应该是"<LLL",并且包含CRC = binascii.crc32(buf, CRC)的行应该是CRC = binascii.crc32(buf, CRC) & 0xffffffff。我是根据Python 2.7中的zipfile.py模块内容得出这个结论的。导致我遇到struct.error异常的原因就是这个错误。另外,使用zlib也是有道理的。 - Erik Kaplun
您可以使用 https://dev59.com/RGw15IYBdhLWcg3wYKmo#6658949 中的方法,将任何迭代器(包括生成器)转换为流(buffer 的第三个参数类型)。 - lambacck
由于某些原因,我在Python 2.7.9上无法使其正常工作。我没有深入研究原因,因为它在2.7.10和2.7.8上按预期工作。 - lambacck
请看下面的答案,这个解决方案不需要 self.fp 是可寻址的。 - jkitchen
显示剩余2条评论

12

Python 3.5中有所更改(来自官方文档):添加了对不可寻址流的写入支持

这意味着现在对于zipfile.ZipFile,我们可以使用不将整个文件存储在内存中的流。这些流不支持在整个数据卷上进行移动。

因此,这是一个简单的生成器:

from zipfile import ZipFile, ZipInfo

def zipfile_generator(path, stream):
    with ZipFile(stream, mode='w') as zf:
        z_info = ZipInfo.from_file(path)
        with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
            for chunk in iter(lambda: entry.read(16384), b''):
                dest.write(chunk)
                # Yield chunk of the zip file stream in bytes.
                yield stream.get()
    # ZipFile was closed.
    yield stream.get()

path是大文件或目录的字符串路径或pathlike对象。

stream是类的不可搜索流实例,设计灵感来自于official docs

from io import RawIOBase

class UnseekableStream(RawIOBase):
    def __init__(self):
        self._buffer = b''

    def writable(self):
        return True

    def write(self, b):
        if self.closed:
            raise ValueError('Stream was closed!')
        self._buffer += b
        return len(b)

    def get(self):
        chunk = self._buffer
        self._buffer = b''
        return chunk

您可以在网上尝试此代码:https://repl.it/@IvanErgunov/zipfilegenerator


还有另一种创建生成器的方法,无需使用ZipInfo并手动读取和分割大文件。您可以将queue.Queue()对象传递给您的UnseekableStream()对象,并在另一个线程中向此队列写入内容。然后,在当前线程中,您可以以可迭代的方式从此队列中读取块。请参见docs P.S.Python Zipstream by allanlei是过时且不可靠的方法。它是在正式支持不可寻址流之前尝试添加对其的支持的。

这实际上是Python3.6+的解决方案,因为ZipFile.open在3.5中只能以读取方式打开。 - Ax The B
我尝试运行了这段代码 - 默认情况下,它会将文件存储而不进行压缩。问题是,即使启用了压缩,它仍然会以实际未压缩的方式存储文件。这是我使用的代码(基本上只添加了compression=ZIP_DEFLATED):https://gist.github.com/egor83/4c818d002f5f12c8740d83271eca51a4 有关如何启用压缩的建议吗? - egor83
1
@egor83 你好,你应该将压缩参数传递给 ZipFile(..., compression=...)zip_info.compression_type = ...。这里有一个示例:https://replit.com/@IvanErgunov/zipfilegeneratorcompressed#main.py - don_vanchos
1
@egor83 Python文档及其附加选项在此处 - don_vanchos

9

我采用了Chris B.的答案并创建了一个完整的解决方案,如果有其他人感兴趣,这里是它:

import os
import threading
from zipfile import *
import zlib, binascii, struct

class ZipEntryWriter(threading.Thread):
    def __init__(self, zf, zinfo, fileobj):
        self.zf = zf
        self.zinfo = zinfo
        self.fileobj = fileobj

        zinfo.file_size = 0
        zinfo.flag_bits = 0x00
        zinfo.header_offset = zf.fp.tell()

        zf._writecheck(zinfo)
        zf._didModify = True

        zinfo.CRC = 0
        zinfo.compress_size = compress_size = 0
        zf.fp.write(zinfo.FileHeader())

        super(ZipEntryWriter, self).__init__()

    def run(self):
        zinfo = self.zinfo
        zf = self.zf
        file_size = 0
        CRC = 0

        if zinfo.compress_type == ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None
        while True:
            buf = self.fileobj.read(1024 * 8)
            if not buf:
                self.fileobj.close()
                break

            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC)
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)

            zf.fp.write(buf)

        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            zf.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        position = zf.fp.tell()
        zf.fp.seek(zinfo.header_offset + 14, 0)
        zf.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
        zf.fp.seek(position, 0)
        zf.filelist.append(zinfo)
        zf.NameToInfo[zinfo.filename] = zinfo

class EnhZipFile(ZipFile, object):

    def _current_writer(self):
        return hasattr(self, 'cur_writer') and self.cur_writer or None

    def assert_no_current_writer(self):
        cur_writer = self._current_writer()
        if cur_writer and cur_writer.isAlive():
            raise ValueError('An entry is already started for name: %s' % cur_write.zinfo.filename)

    def write(self, filename, arcname=None, compress_type=None):
        self.assert_no_current_writer()
        super(EnhZipFile, self).write(filename, arcname, compress_type)

    def writestr(self, zinfo_or_arcname, bytes):
        self.assert_no_current_writer()
        super(EnhZipFile, self).writestr(zinfo_or_arcname, bytes)

    def close(self):
        self.finish_entry()
        super(EnhZipFile, self).close()

    def start_entry(self, zipinfo):
        """
        Start writing a new entry with the specified ZipInfo and return a
        file like object. Any data written to the file like object is
        read by a background thread and written directly to the zip file.
        Make sure to close the returned file object, before closing the
        zipfile, or the close() would end up hanging indefinitely.

        Only one entry can be open at any time. If multiple entries need to
        be written, make sure to call finish_entry() before calling any of
        these methods:
        - start_entry
        - write
        - writestr
        It is not necessary to explicitly call finish_entry() before closing
        zipfile.

        Example:
            zf = EnhZipFile('tmp.zip', 'w')
            w = zf.start_entry(ZipInfo('t.txt'))
            w.write("some text")
            w.close()
            zf.close()
        """
        self.assert_no_current_writer()
        r, w = os.pipe()
        self.cur_writer = ZipEntryWriter(self, zipinfo, os.fdopen(r, 'r'))
        self.cur_writer.start()
        return os.fdopen(w, 'w')

    def finish_entry(self, timeout=None):
        """
        Ensure that the ZipEntry that is currently being written is finished.
        Joins on any background thread to exit. It is safe to call this method
        multiple times.
        """
        cur_writer = self._current_writer()
        if not cur_writer or not cur_writer.isAlive():
            return
        cur_writer.join(timeout)

if __name__ == "__main__":
    zf = EnhZipFile('c:/tmp/t.zip', 'w')
    import time
    w = zf.start_entry(ZipInfo('t.txt', time.localtime()[:6]))
    w.write("Line1\n")
    w.write("Line2\n")
    w.close()
    zf.finish_entry()
    w = zf.start_entry(ZipInfo('p.txt', time.localtime()[:6]))
    w.write("Some text\n")
    w.close()
    zf.close()

1
非线程版本怎么样?如果您的意图是提高性能,那么在Python中使用线程并不会增加性能。 - Erik Kaplun
这很好,但是当我尝试像BytesIO一样使用它(用于创建XMLGenerator的XML文档)时,当我尝试关闭文件时会出现异常。Traceback(最近的调用最后): File“/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”,第810行,在__bootstrap_inner self.run() File“bufzip.py”,第62行,在运行 zf.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size)) error:整数超出'L'格式代码的范围 - Charlie Clark
正如Erik在上面指出的那样,格式字符串中有一个拼写错误,必须是"<LLL",而CRC需要是CRC = binascii.crc32(buf, CRC) & 0xffffffff - Charlie Clark
除了暴露写入流之外,其余代码都来自Python内置类,因此也许错误也存在于那里? - haridsv

3

gzip.GzipFile会将数据分成gzipped块进行写入,您可以根据从文件中读取的行数设置块的大小。

例如:

file = gzip.GzipFile('blah.gz', 'wb')
sourcefile = open('source', 'rb')
chunks = []
for line in sourcefile:
  chunks.append(line)
  if len(chunks) >= X: 
      file.write("".join(chunks))
      file.flush()
      chunks = []

由于某些不明原因,结果必须是一个ZIP文件,而不是GZIP文件或任何其他压缩文件。[评论很无礼,所以我删除了它们。] - S.Lott
3
未指定并不等同于模糊不清。我制作的存档文件需要在Windows环境下由办公人员打开。他们都具备zip实用程序,但没有GZIP。 - Chris B.

3

必要的压缩由zlib.compressobj完成。在Python 2.5下的MacOSX上,ZipFile似乎是已经编译好的。Python 2.3版本如下所示。

您可以看到它以8k块构建压缩文件。删除源文件信息比较复杂,因为许多源文件属性(如未压缩大小)都记录在zip文件头中。

def write(self, filename, arcname=None, compress_type=None):
    """Put the bytes from filename into the archive under the name
    arcname."""

    st = os.stat(filename)
    mtime = time.localtime(st.st_mtime)
    date_time = mtime[0:6]
    # Create ZipInfo instance to store file information
    if arcname is None:
        zinfo = ZipInfo(filename, date_time)
    else:
        zinfo = ZipInfo(arcname, date_time)
    zinfo.external_attr = st[0] << 16L      # Unix attributes
    if compress_type is None:
        zinfo.compress_type = self.compression
    else:
        zinfo.compress_type = compress_type
    self._writecheck(zinfo)
    fp = open(filename, "rb")

    zinfo.flag_bits = 0x00
    zinfo.header_offset = self.fp.tell()    # Start of header bytes
    # Must overwrite CRC and sizes with correct data later
    zinfo.CRC = CRC = 0
    zinfo.compress_size = compress_size = 0
    zinfo.file_size = file_size = 0
    self.fp.write(zinfo.FileHeader())
    zinfo.file_offset = self.fp.tell()      # Start of file bytes
    if zinfo.compress_type == ZIP_DEFLATED:
        cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
             zlib.DEFLATED, -15)
    else:
        cmpr = None
    while 1:
        buf = fp.read(1024 * 8)
        if not buf:
            break
        file_size = file_size + len(buf)
        CRC = binascii.crc32(buf, CRC)
        if cmpr:
            buf = cmpr.compress(buf)
            compress_size = compress_size + len(buf)
        self.fp.write(buf)
    fp.close()
    if cmpr:
        buf = cmpr.flush()
        compress_size = compress_size + len(buf)
        self.fp.write(buf)
        zinfo.compress_size = compress_size
    else:
        zinfo.compress_size = file_size
    zinfo.CRC = CRC
    zinfo.file_size = file_size
    # Seek backwards and write CRC and file sizes
    position = self.fp.tell()       # Preserve current position in file
    self.fp.seek(zinfo.header_offset + 14, 0)
    self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
          zinfo.file_size))
    self.fp.seek(position, 0)
    self.filelist.append(zinfo)
    self.NameToInfo[zinfo.filename] = zinfo

是的,在我发布问题后,我查看了源代码。你说得对,它使用文件信息作为标题,但实际上没有必要这样做——事实上,它之后会覆盖一些信息。我已经发布了一个重写的方法,可以满足我的需求。 - Chris B.
我本来希望避免重写,因为它依赖于Python库的内部工作方式,但似乎真的没有其他办法了。 - Chris B.

1

一些(许多?大多数?)压缩算法是基于查看整个文件中的冗余性。

一些压缩库将根据哪种压缩算法在文件上运行最佳来选择。

我相信ZipFile模块就是这样做的,因此它想要查看整个文件,而不仅仅是一次处理一部分。

因此,它无法使用生成器或文件太大而无法加载到内存中。这可以解释Zipfile库的限制。


1
+1 同意。虽然 Chris B. 的回答说明这不是不可能的,但我认为将整个文件交给压缩算法可能更有意义。需要生成良好的哈夫曼编码树所需的分析对于整个文件来说更准确。因此,结果可能会更小/压缩得更好。 - Philip Daubmeier

1
如果有人偶然遇到这个问题,这个问题在2017年对于Python 2.7仍然很重要,这里提供了一个真正的流式zip文件的解决方案,不需要像其他情况一样要求输出可寻址。秘密是设置通用目的位标志的第3位(请参见https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT第4.3.9.1节)。
请注意,此实现将始终创建ZIP64样式的文件,允许流式处理任意大的文件。它包括一个丑陋的工具来强制使用zip64中央目录记录,因此请注意它会导致您的进程编写的所有zip文件变成ZIP64样式。
import io
import zipfile
import zlib
import binascii
import struct

class ByteStreamer(io.BytesIO):
    '''
    Variant on BytesIO which lets you write and consume data while
    keeping track of the total filesize written. When data is consumed
    it is removed from memory, keeping the memory requirements low.
    '''
    def __init__(self):
        super(ByteStreamer, self).__init__()
        self._tellall = 0

    def tell(self):
        return self._tellall

    def write(self, b):
        orig_size = super(ByteStreamer, self).tell()
        super(ByteStreamer, self).write(b)
        new_size = super(ByteStreamer, self).tell()
        self._tellall += (new_size - orig_size)

    def consume(self):
        bytes = self.getvalue()
        self.seek(0)
        self.truncate(0)
        return bytes

class BufferedZipFileWriter(zipfile.ZipFile):
    '''
    ZipFile writer with true streaming (input and output).
    Created zip files are always ZIP64-style because it is the only safe way to stream
    potentially large zip files without knowing the full size ahead of time.

    Example usage:
    >>> def stream():
    >>>     bzfw = BufferedZip64FileWriter()
    >>>     for arc_path, buffer in inputs:  # buffer is a file-like object which supports read(size)
    >>>         for chunk in bzfw.streambuffer(arc_path, buffer):
    >>>             yield chunk
    >>>     yield bzfw.close()
    '''
    def __init__(self, compression=zipfile.ZIP_DEFLATED):
        self._buffer = ByteStreamer()
        super(BufferedZipFileWriter, self).__init__(self._buffer, mode='w', compression=compression, allowZip64=True)

    def streambuffer(self, zinfo_or_arcname, buffer, chunksize=2**16):
        if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
            zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
                                    date_time=time.localtime(time.time())[:6])
            zinfo.compress_type = self.compression
            zinfo.external_attr = 0o600 << 16     # ?rw-------
        else:
            zinfo = zinfo_or_arcname

        zinfo.file_size = file_size = 0
        zinfo.flag_bits = 0x08  # Streaming mode: crc and size come after the data
        zinfo.header_offset = self.fp.tell()

        self._writecheck(zinfo)
        self._didModify = True

        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        self.fp.write(zinfo.FileHeader())
        if zinfo.compress_type == zipfile.ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None

        while True:
            buf = buffer.read(chunksize)
            if not buf:
                break

            file_size += len(buf)
            CRC = binascii.crc32(buf, CRC) & 0xffffffff
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size += len(buf)

            self.fp.write(buf)
            compressed_bytes = self._buffer.consume()
            if compressed_bytes:
                yield compressed_bytes

        if cmpr:
            buf = cmpr.flush()
            compress_size += len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
            compressed_bytes = self._buffer.consume()
            if compressed_bytes:
                yield compressed_bytes
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        # Write CRC and file sizes after the file data
        # Always write as zip64 -- only safe way to stream what might become a large zipfile
        fmt = '<LQQ'
        self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size))

        self.fp.flush()
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo
        yield self._buffer.consume()

    # The close method needs to be patched to force writing a ZIP64 file
    # We'll hack ZIP_FILECOUNT_LIMIT to do the forcing
    def close(self):
        tmp = zipfile.ZIP_FILECOUNT_LIMIT
        zipfile.ZIP_FILECOUNT_LIMIT = 0
        super(BufferedZipFileWriter, self).close()
        zipfile.ZIP_FILECOUNT_LIMIT = tmp
        return self._buffer.consume()

干得好,这对我来说一切顺利。我在一个大小为65MB的文件和一个大小为270MB的文件上进行了测试,我的内存分析工具显示无论输入文件大小如何,增量内存不到1MB。与使用标准工具创建的zip文件相比,zip文件的大小相同,并且在OSX和Win上打开时没有出现任何问题。 - mwag
有两个小错别字需要修正:添加import time,在示例用法中应该是BufferedZipFileWriter而不是BufferedZip64FileWriter - mwag
我是Python的新手,但你们是如何创建缓冲区的呢? - Varun Singh

0

gzip.GzipFile会将数据分块写入压缩文件中,你可以根据从文件中读取的行数设置块的大小。

例如:

file = gzip.GzipFile('blah.gz', 'wb')
sourcefile = open('source', 'rb')
chunks = []
for line in sourcefile:
  chunks.append(line)
  if len(chunks) >= X: 
      file.write("".join(chunks))
      file.flush()
      chunks = []

0

现在已经是2017年了。如果你还在寻找一种优雅的方式,来实现这个目标,可以使用 allanlei的Python Zipstream库。到目前为止,这可能是唯一一个能够完美达成这个目标的库了。


0

你可以使用stream-zip来完成这个任务(完全透明:主要由我编写)。

假设你有一些字节生成器需要进行压缩:

def file_data_1():
    yield b'Some bytes a'
    yield b'Some bytes b'

def file_data_2():
    yield b'Some bytes c'
    yield b'Some bytes d'

您可以创建一个可迭代对象,其中包含这些生成器的压缩字节:

from datetime import datetime
from stream_zip import ZIP_64, stream_zip

def zip_member_files():
    modified_at = datetime.now()
    perms = 0o600
    yield 'my-file-1.txt', modified_at, perms, ZIP_64, file_data_1()
    yield 'my-file-2.txt', modified_at, perms, ZIP_64, file_data_2()

zipped_chunks = stream_zip(zip_member_files()):

然后,例如,通过以下方式将此可迭代对象保存到磁盘中:

with open('my.zip', 'wb') as f:
    for chunk in zipped_chunks:
        f.write(chunk)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接