Python zipfile 线程安全吗?

10
在Django项目中,我需要为数据库中的对象生成一些PDF文件。由于每个文件需要几秒钟才能生成,因此我使用celery异步运行任务。
问题是,我需要将每个文件添加到zip归档文件中。我计划使用Python zipfile模块,但不同的任务可以在不同的线程中运行,我想知道如果两个任务同时尝试将文件添加到归档文件中会发生什么。
以下代码是否线程安全?我在Python官方文档中找不到任何有价值的信息。
try:
    zippath = os.path.join(pdf_directory, 'archive.zip')
    zipfile = ZipFile(zippath, 'a')
    zipfile.write(pdf_fullname)
finally:
    zipfile.close()

注意:此代码运行在 Python 2.6 环境下。

你使用哪种celery并发方法?如果你的代码在celery任务中使用默认的多进程并发方法执行,那么它们将在单独的进程中执行,你不需要担心线程安全问题。 - mher
那么,问题不在于线程安全,而在于同时对文件进行写入访问。 - Thibault J
4个回答

6
不是线程安全的意义上,如果你在向同一个zip文件添加内容,则需要进行锁定,否则文件内容可能会被破坏。如果你在向不同的zip文件添加内容,使用单独的ZipFile()对象,则不需要锁定,是安全的。

2
虽然这个问题很旧,但它仍然排名靠前在谷歌搜索结果中,所以我想插一句话说,在Windows上的Python 3.4 64位版本中,lzma zipfile是线程安全的,而其他压缩方式则失败。
with zipfile.ZipFile("test.zip", "w", zipfile.ZIP_LZMA) as zip:
    #do stuff in threads

请注意,您不能将同一文件与多个zipfile.ZipFile实例绑定,而必须在所有线程中使用相同的实例;这里指的是名为zip的变量。
在我的情况下,我在8个核心和SSD上获得了大约80-90%的CPU利用率,这很好。

2
Python 3.5.5使得对ZipFile的写入和读取多个ZipExtFiles线程安全:https://docs.python.org/3.5/whatsnew/changelog.html#id93 据我了解,这个变化并没有被迁移到Python 2.7。
更新:经过代码研究和测试,发现锁定还没有完全实现。它只能正确处理“writestr”,而不能处理“open”和“write”。

1
请在更改日志中查找“bpo-14099”。 - Andre Holzner
1
@AndreHolzner 是的,那就是我想说的。至少在3.5.5版本中,我的不完全支持的评论仍然是正确的。 - Eugene Pakhomov

0

我尝试了上面提到的解决方案,但无法在Python 3.9.13下使它们工作。 ZipFile会阻止您尝试使用以下错误消息一次写入多个文件:

ValueError: Can't write to ZIP archive while an open writing handle exists.

问题在于压缩是我机器上的重头戏,没有理由不在多个线程上进行压缩。只有写本身必须是分离的。 因此,我前进并编写了一个类来实现线程安全的writestr函数,并将其与ThreadPoolExecutor一起使用,在我的机器上获得了10倍的速度提升。 新的writestr函数允许在多个线程中发生压缩,而写入则在线程锁下发生,我忽略了_writing属性导致异常首先发生的原因。
这有点hacky,但对我有效。
新类的代码如下:
import zipfile, time

class EmptyCompressor(object):
    def flush(self):
        return bytes(0)

class ZipFileParallel(zipfile.ZipFile):
    def writestr(self, zinfo_or_arcname, data,
                 compress_type=None, compresslevel=None):
        """Write a file into the archive.  The contents is 'data', which
        may be either a 'str' or a 'bytes' instance; if it is a 'str',
        it is encoded as UTF-8 first.
        'zinfo_or_arcname' is either a ZipInfo instance or
        the name of the file in the archive."""
        if isinstance(data, str):
            data = data.encode("utf-8")
        if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
            zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
                            date_time=time.localtime(time.time())[:6])
            zinfo.compress_type = self.compression
            zinfo._compresslevel = self.compresslevel
            if zinfo.filename[-1] == '/':
                zinfo.external_attr = 0o40775 << 16  # drwxrwxr-x
                zinfo.external_attr |= 0x10  # MS-DOS directory flag
            else:
                zinfo.external_attr = 0o600 << 16  # ?rw-------
        else:
            zinfo = zinfo_or_arcname

        if not self.fp:
            raise ValueError(
                "Attempt to write to ZIP archive that was already closed")

        if compress_type is not None:
            zinfo.compress_type = compress_type

        if compresslevel is not None:
            zinfo._compresslevel = compresslevel

        zinfo.file_size = len(data)  # Uncompressed size
        crc = zipfile.crc32(data, 0)
        # compress data
        compressor = zipfile._get_compressor(zinfo.compress_type, zinfo._compresslevel)
        data = compressor.compress(data)
        data += compressor.flush()

        with self._lock:
            with self.open(zinfo, mode='w') as dest:
                dest._compressor = None # remove the compressor so it doesn't compress again
                dest.write(data)
                dest._crc = crc
                dest._file_size = zinfo.file_size
                dest._compress_size = len(data)
                dest._compressor = EmptyCompressor() # use an empty compressor

使用它的一个示例如下:

file = ZipFileParallel('file.zip','w',zipfile.ZIP_BZIP2)
with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
    
        for img in enumerate(images):
            fname = f'images/{idx}.raw'
            futures.append(executor.submit(file.writestr, fname ,img.tobytes()))
    
    concurrent.futures.wait(futures)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接