我尝试了上面提到的解决方案,但无法在Python 3.9.13下使它们工作。
ZipFile会阻止您尝试使用以下错误消息一次写入多个文件:
ValueError: Can't write to ZIP archive while an open writing handle exists.
问题在于压缩是我机器上的重头戏,没有理由不在多个线程上进行压缩。只有写本身必须是分离的。
因此,我前进并编写了一个类来实现线程安全的writestr函数,并将其与ThreadPoolExecutor一起使用,在我的机器上获得了10倍的速度提升。
新的writestr函数允许在多个线程中发生压缩,而写入则在线程锁下发生,我忽略了_writing属性导致异常首先发生的原因。
这有点hacky,但对我有效。
新类的代码如下:
import zipfile, time
class EmptyCompressor(object):
def flush(self):
return bytes(0)
class ZipFileParallel(zipfile.ZipFile):
def writestr(self, zinfo_or_arcname, data,
compress_type=None, compresslevel=None):
"""Write a file into the archive. The contents is 'data', which
may be either a 'str' or a 'bytes' instance; if it is a 'str',
it is encoded as UTF-8 first.
'zinfo_or_arcname' is either a ZipInfo instance or
the name of the file in the archive."""
if isinstance(data, str):
data = data.encode("utf-8")
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
date_time=time.localtime(time.time())[:6])
zinfo.compress_type = self.compression
zinfo._compresslevel = self.compresslevel
if zinfo.filename[-1] == '/':
zinfo.external_attr = 0o40775 << 16
zinfo.external_attr |= 0x10
else:
zinfo.external_attr = 0o600 << 16
else:
zinfo = zinfo_or_arcname
if not self.fp:
raise ValueError(
"Attempt to write to ZIP archive that was already closed")
if compress_type is not None:
zinfo.compress_type = compress_type
if compresslevel is not None:
zinfo._compresslevel = compresslevel
zinfo.file_size = len(data)
crc = zipfile.crc32(data, 0)
compressor = zipfile._get_compressor(zinfo.compress_type, zinfo._compresslevel)
data = compressor.compress(data)
data += compressor.flush()
with self._lock:
with self.open(zinfo, mode='w') as dest:
dest._compressor = None
dest.write(data)
dest._crc = crc
dest._file_size = zinfo.file_size
dest._compress_size = len(data)
dest._compressor = EmptyCompressor()
使用它的一个示例如下:
file = ZipFileParallel('file.zip','w',zipfile.ZIP_BZIP2)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for img in enumerate(images):
fname = f'images/{idx}.raw'
futures.append(executor.submit(file.writestr, fname ,img.tobytes()))
concurrent.futures.wait(futures)