我有大量数据(几个G)需要在Python中写入zip文件。我无法一次性将所有数据加载到内存中,然后传递给ZipFile的.writestr方法,并且我真的不想使用临时文件将它全部写入磁盘,然后再重新读取。
有没有一种方法可以将生成器或类似文件的对象提供给ZipFile库?或者有没有某种原因不支持这种功能?
“zip文件”指的是zip文件,在Python的zipfile包中得到支持。
我有大量数据(几个G)需要在Python中写入zip文件。我无法一次性将所有数据加载到内存中,然后传递给ZipFile的.writestr方法,并且我真的不想使用临时文件将它全部写入磁盘,然后再重新读取。
有没有一种方法可以将生成器或类似文件的对象提供给ZipFile库?或者有没有某种原因不支持这种功能?
“zip文件”指的是zip文件,在Python的zipfile包中得到支持。
唯一的解决方案是重写用于压缩文件的方法,以从缓冲区中读取。将其添加到标准库中将非常简单;我很惊讶它还没有被完成。我了解到有很多人都同意需要彻底改进整个接口,这似乎阻碍了任何渐进式的改进。
import zipfile, zlib, binascii, struct
class BufferedZipFile(zipfile.ZipFile):
def writebuffered(self, zipinfo, buffer):
zinfo = zipinfo
zinfo.file_size = file_size = 0
zinfo.flag_bits = 0x00
zinfo.header_offset = self.fp.tell()
self._writecheck(zinfo)
self._didModify = True
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
self.fp.write(zinfo.FileHeader())
if zinfo.compress_type == zipfile.ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
else:
cmpr = None
while True:
buf = buffer.read(1024 * 8)
if not buf:
break
file_size = file_size + len(buf)
CRC = binascii.crc32(buf, CRC) & 0xffffffff
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
self.fp.write(buf)
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
position = self.fp.tell()
self.fp.seek(zinfo.header_offset + 14, 0)
self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
self.fp.seek(position, 0)
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
struct.pack
的fmt
参数应该是"<LLL"
,并且包含CRC = binascii.crc32(buf, CRC)
的行应该是CRC = binascii.crc32(buf, CRC) & 0xffffffff
。我是根据Python 2.7中的zipfile.py
模块内容得出这个结论的。导致我遇到struct.error
异常的原因就是这个错误。另外,使用zlib
也是有道理的。 - Erik Kaplunbuffer
的第三个参数类型)。 - lambacckPython 3.5中有所更改(来自官方文档):添加了对不可寻址流的写入支持。
这意味着现在对于zipfile.ZipFile
,我们可以使用不将整个文件存储在内存中的流。这些流不支持在整个数据卷上进行移动。
因此,这是一个简单的生成器:
from zipfile import ZipFile, ZipInfo
def zipfile_generator(path, stream):
with ZipFile(stream, mode='w') as zf:
z_info = ZipInfo.from_file(path)
with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
for chunk in iter(lambda: entry.read(16384), b''):
dest.write(chunk)
# Yield chunk of the zip file stream in bytes.
yield stream.get()
# ZipFile was closed.
yield stream.get()
path
是大文件或目录的字符串路径或pathlike
对象。
stream
是类的不可搜索流实例,设计灵感来自于official docs。
from io import RawIOBase
class UnseekableStream(RawIOBase):
def __init__(self):
self._buffer = b''
def writable(self):
return True
def write(self, b):
if self.closed:
raise ValueError('Stream was closed!')
self._buffer += b
return len(b)
def get(self):
chunk = self._buffer
self._buffer = b''
return chunk
您可以在网上尝试此代码:https://repl.it/@IvanErgunov/zipfilegenerator
ZipInfo
并手动读取和分割大文件。您可以将queue.Queue()
对象传递给您的UnseekableStream()
对象,并在另一个线程中向此队列写入内容。然后,在当前线程中,您可以以可迭代的方式从此队列中读取块。请参见docs
P.S.Python Zipstream by allanlei是过时且不可靠的方法。它是在正式支持不可寻址流之前尝试添加对其的支持的。compression=ZIP_DEFLATED
):https://gist.github.com/egor83/4c818d002f5f12c8740d83271eca51a4
有关如何启用压缩的建议吗? - egor83ZipFile(..., compression=...)
和 zip_info.compression_type = ...
。这里有一个示例:https://replit.com/@IvanErgunov/zipfilegeneratorcompressed#main.py - don_vanchos我采用了Chris B.的答案并创建了一个完整的解决方案,如果有其他人感兴趣,这里是它:
import os
import threading
from zipfile import *
import zlib, binascii, struct
class ZipEntryWriter(threading.Thread):
def __init__(self, zf, zinfo, fileobj):
self.zf = zf
self.zinfo = zinfo
self.fileobj = fileobj
zinfo.file_size = 0
zinfo.flag_bits = 0x00
zinfo.header_offset = zf.fp.tell()
zf._writecheck(zinfo)
zf._didModify = True
zinfo.CRC = 0
zinfo.compress_size = compress_size = 0
zf.fp.write(zinfo.FileHeader())
super(ZipEntryWriter, self).__init__()
def run(self):
zinfo = self.zinfo
zf = self.zf
file_size = 0
CRC = 0
if zinfo.compress_type == ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
else:
cmpr = None
while True:
buf = self.fileobj.read(1024 * 8)
if not buf:
self.fileobj.close()
break
file_size = file_size + len(buf)
CRC = binascii.crc32(buf, CRC)
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
zf.fp.write(buf)
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
zf.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
position = zf.fp.tell()
zf.fp.seek(zinfo.header_offset + 14, 0)
zf.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
zf.fp.seek(position, 0)
zf.filelist.append(zinfo)
zf.NameToInfo[zinfo.filename] = zinfo
class EnhZipFile(ZipFile, object):
def _current_writer(self):
return hasattr(self, 'cur_writer') and self.cur_writer or None
def assert_no_current_writer(self):
cur_writer = self._current_writer()
if cur_writer and cur_writer.isAlive():
raise ValueError('An entry is already started for name: %s' % cur_write.zinfo.filename)
def write(self, filename, arcname=None, compress_type=None):
self.assert_no_current_writer()
super(EnhZipFile, self).write(filename, arcname, compress_type)
def writestr(self, zinfo_or_arcname, bytes):
self.assert_no_current_writer()
super(EnhZipFile, self).writestr(zinfo_or_arcname, bytes)
def close(self):
self.finish_entry()
super(EnhZipFile, self).close()
def start_entry(self, zipinfo):
"""
Start writing a new entry with the specified ZipInfo and return a
file like object. Any data written to the file like object is
read by a background thread and written directly to the zip file.
Make sure to close the returned file object, before closing the
zipfile, or the close() would end up hanging indefinitely.
Only one entry can be open at any time. If multiple entries need to
be written, make sure to call finish_entry() before calling any of
these methods:
- start_entry
- write
- writestr
It is not necessary to explicitly call finish_entry() before closing
zipfile.
Example:
zf = EnhZipFile('tmp.zip', 'w')
w = zf.start_entry(ZipInfo('t.txt'))
w.write("some text")
w.close()
zf.close()
"""
self.assert_no_current_writer()
r, w = os.pipe()
self.cur_writer = ZipEntryWriter(self, zipinfo, os.fdopen(r, 'r'))
self.cur_writer.start()
return os.fdopen(w, 'w')
def finish_entry(self, timeout=None):
"""
Ensure that the ZipEntry that is currently being written is finished.
Joins on any background thread to exit. It is safe to call this method
multiple times.
"""
cur_writer = self._current_writer()
if not cur_writer or not cur_writer.isAlive():
return
cur_writer.join(timeout)
if __name__ == "__main__":
zf = EnhZipFile('c:/tmp/t.zip', 'w')
import time
w = zf.start_entry(ZipInfo('t.txt', time.localtime()[:6]))
w.write("Line1\n")
w.write("Line2\n")
w.close()
zf.finish_entry()
w = zf.start_entry(ZipInfo('p.txt', time.localtime()[:6]))
w.write("Some text\n")
w.close()
zf.close()
Traceback(最近的调用最后): File“/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”,第810行,在__bootstrap_inner self.run() File“bufzip.py”,第62行,在运行 zf.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size)) error:整数超出'L'格式代码的范围
- Charlie Clark"<LLL"
,而CRC需要是CRC = binascii.crc32(buf, CRC) & 0xffffffff
。 - Charlie Clarkgzip.GzipFile会将数据分成gzipped块进行写入,您可以根据从文件中读取的行数设置块的大小。
例如:
file = gzip.GzipFile('blah.gz', 'wb')
sourcefile = open('source', 'rb')
chunks = []
for line in sourcefile:
chunks.append(line)
if len(chunks) >= X:
file.write("".join(chunks))
file.flush()
chunks = []
必要的压缩由zlib.compressobj完成。在Python 2.5下的MacOSX上,ZipFile似乎是已经编译好的。Python 2.3版本如下所示。
您可以看到它以8k块构建压缩文件。删除源文件信息比较复杂,因为许多源文件属性(如未压缩大小)都记录在zip文件头中。
def write(self, filename, arcname=None, compress_type=None):
"""Put the bytes from filename into the archive under the name
arcname."""
st = os.stat(filename)
mtime = time.localtime(st.st_mtime)
date_time = mtime[0:6]
# Create ZipInfo instance to store file information
if arcname is None:
zinfo = ZipInfo(filename, date_time)
else:
zinfo = ZipInfo(arcname, date_time)
zinfo.external_attr = st[0] << 16L # Unix attributes
if compress_type is None:
zinfo.compress_type = self.compression
else:
zinfo.compress_type = compress_type
self._writecheck(zinfo)
fp = open(filename, "rb")
zinfo.flag_bits = 0x00
zinfo.header_offset = self.fp.tell() # Start of header bytes
# Must overwrite CRC and sizes with correct data later
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
zinfo.file_size = file_size = 0
self.fp.write(zinfo.FileHeader())
zinfo.file_offset = self.fp.tell() # Start of file bytes
if zinfo.compress_type == ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, -15)
else:
cmpr = None
while 1:
buf = fp.read(1024 * 8)
if not buf:
break
file_size = file_size + len(buf)
CRC = binascii.crc32(buf, CRC)
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
self.fp.write(buf)
fp.close()
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
# Seek backwards and write CRC and file sizes
position = self.fp.tell() # Preserve current position in file
self.fp.seek(zinfo.header_offset + 14, 0)
self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
zinfo.file_size))
self.fp.seek(position, 0)
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
一些(许多?大多数?)压缩算法是基于查看整个文件中的冗余性。
一些压缩库将根据哪种压缩算法在文件上运行最佳来选择。
我相信ZipFile模块就是这样做的,因此它想要查看整个文件,而不仅仅是一次处理一部分。
因此,它无法使用生成器或文件太大而无法加载到内存中。这可以解释Zipfile库的限制。
import io
import zipfile
import zlib
import binascii
import struct
class ByteStreamer(io.BytesIO):
'''
Variant on BytesIO which lets you write and consume data while
keeping track of the total filesize written. When data is consumed
it is removed from memory, keeping the memory requirements low.
'''
def __init__(self):
super(ByteStreamer, self).__init__()
self._tellall = 0
def tell(self):
return self._tellall
def write(self, b):
orig_size = super(ByteStreamer, self).tell()
super(ByteStreamer, self).write(b)
new_size = super(ByteStreamer, self).tell()
self._tellall += (new_size - orig_size)
def consume(self):
bytes = self.getvalue()
self.seek(0)
self.truncate(0)
return bytes
class BufferedZipFileWriter(zipfile.ZipFile):
'''
ZipFile writer with true streaming (input and output).
Created zip files are always ZIP64-style because it is the only safe way to stream
potentially large zip files without knowing the full size ahead of time.
Example usage:
>>> def stream():
>>> bzfw = BufferedZip64FileWriter()
>>> for arc_path, buffer in inputs: # buffer is a file-like object which supports read(size)
>>> for chunk in bzfw.streambuffer(arc_path, buffer):
>>> yield chunk
>>> yield bzfw.close()
'''
def __init__(self, compression=zipfile.ZIP_DEFLATED):
self._buffer = ByteStreamer()
super(BufferedZipFileWriter, self).__init__(self._buffer, mode='w', compression=compression, allowZip64=True)
def streambuffer(self, zinfo_or_arcname, buffer, chunksize=2**16):
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
date_time=time.localtime(time.time())[:6])
zinfo.compress_type = self.compression
zinfo.external_attr = 0o600 << 16 # ?rw-------
else:
zinfo = zinfo_or_arcname
zinfo.file_size = file_size = 0
zinfo.flag_bits = 0x08 # Streaming mode: crc and size come after the data
zinfo.header_offset = self.fp.tell()
self._writecheck(zinfo)
self._didModify = True
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
self.fp.write(zinfo.FileHeader())
if zinfo.compress_type == zipfile.ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
else:
cmpr = None
while True:
buf = buffer.read(chunksize)
if not buf:
break
file_size += len(buf)
CRC = binascii.crc32(buf, CRC) & 0xffffffff
if cmpr:
buf = cmpr.compress(buf)
compress_size += len(buf)
self.fp.write(buf)
compressed_bytes = self._buffer.consume()
if compressed_bytes:
yield compressed_bytes
if cmpr:
buf = cmpr.flush()
compress_size += len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
compressed_bytes = self._buffer.consume()
if compressed_bytes:
yield compressed_bytes
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
# Write CRC and file sizes after the file data
# Always write as zip64 -- only safe way to stream what might become a large zipfile
fmt = '<LQQ'
self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size))
self.fp.flush()
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
yield self._buffer.consume()
# The close method needs to be patched to force writing a ZIP64 file
# We'll hack ZIP_FILECOUNT_LIMIT to do the forcing
def close(self):
tmp = zipfile.ZIP_FILECOUNT_LIMIT
zipfile.ZIP_FILECOUNT_LIMIT = 0
super(BufferedZipFileWriter, self).close()
zipfile.ZIP_FILECOUNT_LIMIT = tmp
return self._buffer.consume()
import time
,在示例用法中应该是BufferedZipFileWriter
而不是BufferedZip64FileWriter
。 - mwaggzip.GzipFile会将数据分块写入压缩文件中,你可以根据从文件中读取的行数设置块的大小。
例如:
file = gzip.GzipFile('blah.gz', 'wb')
sourcefile = open('source', 'rb')
chunks = []
for line in sourcefile:
chunks.append(line)
if len(chunks) >= X:
file.write("".join(chunks))
file.flush()
chunks = []
现在已经是2017年了。如果你还在寻找一种优雅的方式,来实现这个目标,可以使用 allanlei的Python Zipstream库。到目前为止,这可能是唯一一个能够完美达成这个目标的库了。
你可以使用stream-zip来完成这个任务(完全透明:主要由我编写)。
假设你有一些字节生成器需要进行压缩:
def file_data_1():
yield b'Some bytes a'
yield b'Some bytes b'
def file_data_2():
yield b'Some bytes c'
yield b'Some bytes d'
您可以创建一个可迭代对象,其中包含这些生成器的压缩字节:
from datetime import datetime
from stream_zip import ZIP_64, stream_zip
def zip_member_files():
modified_at = datetime.now()
perms = 0o600
yield 'my-file-1.txt', modified_at, perms, ZIP_64, file_data_1()
yield 'my-file-2.txt', modified_at, perms, ZIP_64, file_data_2()
zipped_chunks = stream_zip(zip_member_files()):
然后,例如,通过以下方式将此可迭代对象保存到磁盘中:
with open('my.zip', 'wb') as f:
for chunk in zipped_chunks:
f.write(chunk)