Python内存中的压缩库

124

是否有Python库可以在内存中操作zip归档文件,而无需使用实际的磁盘文件?

ZipFile库不允许您更新归档文件。唯一的方法似乎是将其解压缩到目录中,进行更改,然后从该目录创建一个新的zip文件。我想要在没有磁盘访问的情况下修改zip归档文件,因为我将下载它们,进行更改,然后再次上传它们,所以我没有理由存储它们。

类似于Java的ZipInputStream / ZipOutputStream的东西就可以做到这一点,尽管任何避免磁盘访问的界面都可以。


在这篇文章中,我回答了相同的问题。https://dev59.com/9rnoa4cB1Zd3GeqPWcCq - Quinten C
9个回答

127

PYTHON 3

import io
import zipfile

zip_buffer = io.BytesIO()

with zipfile.ZipFile(zip_buffer, "a",
                     zipfile.ZIP_DEFLATED, False) as zip_file:
    for file_name, data in [('1.txt', io.BytesIO(b'111')),
                            ('2.txt', io.BytesIO(b'222'))]:
        zip_file.writestr(file_name, data.getvalue())

with open('C:/1.zip', 'wb') as f:
    f.write(zip_buffer.getvalue())

2
请参阅文档链接data可以是字节或字符串,在Ubuntu和Python 3.6上运行良好。 - Edgar H
1
为什么不直接编写字节,而是将数据包装在io.BytesIO中,然后在每次迭代中使用getvalue? - freesoul
@freesoul,刚刚阅读了writestr文档,它需要... - Vladimir
@Vladimir 你的意思是什么?如果你将 io.BytesIO(b'111') 更改为 b'111',并将 data.getvalue() 更改为 data,结果不是相同的吗? - freesoul
@freesoul,BytesIO在这里是常见的使用情况。 - Vladimir

122

根据Python文档

class zipfile.ZipFile(file[, mode[, compression[, allowZip64]]])

  Open a ZIP file, where file can be either a path to a file (a string) or a file-like object. 

所以,要在内存中打开文件,只需创建一个类似文件的对象(可以使用 BytesIO)。

file_like_object = io.BytesIO(my_zip_data)
zipfile_ob = zipfile.ZipFile(file_like_object)

1
如何将不同的文件写入内存对象中?即在档案中创建a/b/c.txt和a/b/cc.txt等文件? - mathtick
1
只有当my_zip_data是一个包含有效构建的zip归档文件的字节对象时(当mode='r'时为默认值),此答案才有效。传递像zipfile.ZipFile(io.BytesIO(), mode='r')这样的空内存缓冲区会失败,因为ZipFile在实例化时检查传递的文件类似对象中是否存在“中央目录结束记录”,当mode='r'时。作为解决方法,Validimir的答案提供了一种构建带有空虚拟文件的zip归档文件缓冲区的方法。 - zozo

60

以下内容摘自 Python内存中的Zip压缩文章:

这是我在2008年5月关于使用Python在内存中进行压缩的一篇文章,由于Posterous即将关闭,现在重新发布。

我最近发现有一个需要收费的组件可以在Python中对文件进行内存中压缩。考虑到这应该是免费的,于是我编写了下面的代码。它只经过了很基本的测试,如果有人发现任何错误,请告诉我,我会及时更新。

import zipfile
import StringIO

class InMemoryZip(object):
    def __init__(self):
        # Create the in-memory file-like object
        self.in_memory_zip = StringIO.StringIO()

    def append(self, filename_in_zip, file_contents):
        '''Appends a file with name filename_in_zip and contents of 
        file_contents to the in-memory zip.'''
        # Get a handle to the in-memory zip in append mode
        zf = zipfile.ZipFile(self.in_memory_zip, "a", zipfile.ZIP_DEFLATED, False)

        # Write the file to the in-memory zip
        zf.writestr(filename_in_zip, file_contents)

        # Mark the files as having been created on Windows so that
        # Unix permissions are not inferred as 0000
        for zfile in zf.filelist:
            zfile.create_system = 0        

        return self

    def read(self):
        '''Returns a string with the contents of the in-memory zip.'''
        self.in_memory_zip.seek(0)
        return self.in_memory_zip.read()

    def writetofile(self, filename):
        '''Writes the in-memory zip to a file.'''
        f = file(filename, "w")
        f.write(self.read())
        f.close()

if __name__ == "__main__":
    # Run a test
    imz = InMemoryZip()
    imz.append("test.txt", "Another test").append("test2.txt", "Still another")
    imz.writetofile("test.zip")

1
有用的链接 - 这是一个很好的例子,展示了如何按照Jason的回答所描述的方式使用ZipFile对象。谢谢。 - John B
1
没问题,很高兴你觉得它有用。 - Justin Ethier
4
如果链接失效,您能否在这里简要概括一下链接的内容,因为如果链接失效,您的回答也将失效。 - Ivo Flipse
2
@IvoFlipse - 很好的观点。我将所有内容都添加到了这篇文章中,以防万一。 - Justin Ethier
3
在Windows操作系统或Python 3.X上无法正常工作,请参考我的答案更新代码。 - Anthon
1
非常抱歉重新激活一个旧帖子,但我尝试了提出的解决方案,但输出的zip文件损坏了。我在Windows环境中使用Python 2.7,压缩文件只有一个文件。压缩文件中的文件名最长为40个字符。 - semantic-dev

22

提供的示例代码存在几个问题,其中一些问题很严重:

  • 在Windows上无法处理真实数据。ZIP文件是二进制文件,应始终使用打开'wb'的方式写入其数据
  • 每次添加文件都会将ZIP文件附加到其中,这是低效的。它可以只被打开并保留为一个InMemoryZip属性
  • 文档说明ZIP文件应显式关闭,但在附加函数中未执行此操作(它可能适用于示例,因为zf超出范围,并且这将关闭ZIP文件)
  • 每次附加文件时都会为zipfile中的所有文件设置create_system标志,而不仅仅是每个文件一次。
  • 在Python < 3上,cStringIO比StringIO更有效率
  • 在Python 3上不起作用(原始文章发布前是3.0版本,但发布代码时3.1已经发布了很长时间)。

如果安装ruamel.std.zipfile,则可获得更新的版本(我是该软件包的作者)。之后请参考相关文档。

pip install ruamel.std.zipfile

或者包括来自这里的类代码,您可以执行:

import ruamel.std.zipfile as zipfile

# Run a test
zipfile.InMemoryZipFile()
imz.append("test.txt", "Another test").append("test2.txt", "Still another")
imz.writetofile("test.zip")  

您可以使用imz.data将内容写入到任何需要的地方。

您还可以使用with语句,如果提供了文件名,则在离开该上下文时ZIP的内容将被写入:

with zipfile.InMemoryZipFile('test.zip') as imz:
    imz.append("test.txt", "Another test").append("test2.txt", "Still another")

由于延迟写入到磁盘,您实际上可以在该上下文中从旧的 test.zip 中读取。


为什么在Python 2中不使用io.BytesIO? - boxed
@boxed 没有特别的原因,只是要检查一下2.7上的BytesIO是否使用了更快的底层C实现,而不是Python兼容性层调用StringIO(而不是CStringIO)。 - Anthon
2
这应该至少包括您编写的用于实际回答问题的代码框架,而不仅仅是告诉人们安装一个模块。如果没有其他内容,至少链接到模块的主页。 - SilverbackNet
对于Python 2.7版本,我建议在传递给writestr()函数之前将Unicode字符串转换为UTF8字符串。更多细节请参见https://dev59.com/DVgR5IYBdhLWcg3wT7xL#67111639。 - Robert Lujo

8

我正在使用Flask创建一个内存zip文件并将其作为下载返回。基于Vladimir上面的示例。"seek(0)"花了一些时间才弄明白。

import io
import zipfile

zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
    for file_name, data in [('1.txt', io.BytesIO(b'111')), ('2.txt', io.BytesIO(b'222'))]:
        zip_file.writestr(file_name, data.getvalue())

zip_buffer.seek(0)
return send_file(zip_buffer, attachment_filename='filename.zip', as_attachment=True)

1
你指出seek(0)真是值得得到一枚奖章。 - undefined

2

创建基于数据的多文件内存zip文件的帮助程序,例如{'1.txt': 'string', '2.txt": b'bytes'}

import io, zipfile

def prepare_zip_file_content(file_name_content: dict) -> bytes:
    """returns Zip bytes ready to be saved with 
    open('C:/1.zip', 'wb') as f: f.write(bytes)
    @file_name_content dict like {'1.txt': 'string', '2.txt": b'bytes'} 
    """
    zip_buffer = io.BytesIO()
    with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
        for file_name, file_data in file_name_content.items():
            zip_file.writestr(file_name, file_data)

    zip_buffer.seek(0)
    return zip_buffer.getvalue()

这个对我来说适用于 Python 3.10.11。 - kidmose
这对我来说适用于Python 3.10.11。 - undefined

1
我希望在无需访问磁盘的情况下修改zip归档文件,因为我需要下载它们,进行更改,然后再次上传它们,所以没有必要存储它们。使用两个库https://github.com/uktrade/stream-unziphttps://github.com/uktrade/stream-zip(完全透明:由我编写),这是可能的。根据更改的不同,您甚至可以一次性将整个zip存储在内存中。例如,如果您只想下载、解压缩、压缩并重新上传,那么这样做有点无意义,但您仍然可以插入对未压缩内容的更改。
from datetime import datetime
import httpx
from stream_unzip import stream_unzip
from stream_zip import stream_zip, ZIP_64

def get_source_bytes_iter(url):
    with httpx.stream('GET', url) as r:
        yield from r.iter_bytes()

def get_target_files(files):
    # stream-unzip doesn't expose perms or modified_at, but stream-zip requires them
    modified_at = datetime.now()
    perms = 0o600

    for name, _, chunks in files:
        # Could change name, manipulate chunks, skip a file, or yield a new file
        yield name.decode(), modified_at, perms, ZIP_64, chunks

source_url = 'https://source.test/file.zip'
target_url = 'https://target.test/file.zip'

source_bytes_iter = get_source_bytes_iter(source_url)
source_files = stream_unzip(source_bytes_iter)
target_files = get_target_files(source_files)
target_bytes_iter = stream_zip(target_files)

httpx.put(target_url, data=target_bytes_iter)

0

你可以通过ctypes在Python中使用libarchive库 - 它提供了在内存中操作ZIP数据的方法,重点是流式处理(至少在历史上是这样)。

假设我们想要在从HTTP服务器下载时即时解压缩ZIP文件。下面是代码:

from contextlib import contextmanager
from ctypes import CFUNCTYPE, POINTER, create_string_buffer, cdll, byref, c_ssize_t, c_char_p, c_int, c_void_p, c_char
from ctypes.util import find_library

import httpx

def get_zipped_chunks(url, chunk_size=6553):
    with httpx.stream('GET', url) as r:
        yield from r.iter_bytes()

def stream_unzip(zipped_chunks, chunk_size=65536):
    # Library
    libarchive = cdll.LoadLibrary(find_library('archive'))

    # Callback types
    open_callback_type = CFUNCTYPE(c_int, c_void_p, c_void_p)
    read_callback_type = CFUNCTYPE(c_ssize_t, c_void_p, c_void_p, POINTER(POINTER(c_char)))
    close_callback_type = CFUNCTYPE(c_int, c_void_p, c_void_p)

    # Function types
    libarchive.archive_read_new.restype = c_void_p
    libarchive.archive_read_open.argtypes = [c_void_p, c_void_p, open_callback_type, read_callback_type, close_callback_type]
    libarchive.archive_read_finish.argtypes = [c_void_p]

    libarchive.archive_entry_new.restype = c_void_p

    libarchive.archive_read_next_header.argtypes = [c_void_p, c_void_p]
    libarchive.archive_read_support_compression_all.argtypes = [c_void_p]
    libarchive.archive_read_support_format_all.argtypes = [c_void_p]

    libarchive.archive_entry_pathname.argtypes = [c_void_p]
    libarchive.archive_entry_pathname.restype = c_char_p

    libarchive.archive_read_data.argtypes = [c_void_p, POINTER(c_char), c_ssize_t]
    libarchive.archive_read_data.restype = c_ssize_t

    libarchive.archive_error_string.argtypes = [c_void_p]
    libarchive.archive_error_string.restype = c_char_p

    ARCHIVE_EOF = 1
    ARCHIVE_OK = 0

    it = iter(zipped_chunks)
    compressed_bytes = None  # Make sure not garbage collected

    @contextmanager
    def get_archive():
        archive = libarchive.archive_read_new()
        if not archive:
            raise Exception('Unable to allocate archive')

        try:
            yield archive
        finally:
            libarchive.archive_read_finish(archive)

    def read_callback(archive, client_data, buffer):
        nonlocal compressed_bytes

        try:
            compressed_bytes = create_string_buffer(next(it))
        except StopIteration:
            return 0
        else:
            buffer[0] = compressed_bytes
            return len(compressed_bytes) - 1

    def uncompressed_chunks(archive):
        uncompressed_bytes = create_string_buffer(chunk_size)
        while (num := libarchive.archive_read_data(archive, uncompressed_bytes, len(uncompressed_bytes))) > 0:
            yield uncompressed_bytes.value[:num]
        if num < 0:
            raise Exception(libarchive.archive_error_string(archive))

    with get_archive() as archive: 
        libarchive.archive_read_support_compression_all(archive)
        libarchive.archive_read_support_format_all(archive)

        libarchive.archive_read_open(
            archive, 0,
            open_callback_type(0), read_callback_type(read_callback), close_callback_type(0),
        )
        entry = c_void_p(libarchive.archive_entry_new())
        if not entry:
            raise Exception('Unable to allocate entry')

        while (status := libarchive.archive_read_next_header(archive, byref(entry))) == ARCHIVE_OK:
            yield (libarchive.archive_entry_pathname(entry), uncompressed_chunks(archive))

        if status != ARCHIVE_EOF:
            raise Exception(libarchive.archive_error_string(archive))

可以按照以下方式来实现这一点。
zipped_chunks = get_zipped_chunks('https://domain.test/file.zip')
files = stream_unzip(zipped_chunks)

for name, uncompressed_chunks in stream_unzip(zipped_chunks):
    print(name)
    for uncompressed_chunk in uncompressed_chunks:
        print(uncompressed_chunk)

实际上,由于libarchive支持多种存档格式,并且上面的内容并不特别针对ZIP格式,因此它很可能可以与其他格式一起使用。

0
请注意,如果您想在Python之外使用新创建的内存中的Zip存档,例如保存到本地磁盘或通过POST请求发送,它需要将中央目录记录写入其中;否则,它将无法被识别为有效的ZIP文件。
这将类似于(对于Python 3.11)
with(
    io.BytesIO() as raw,
    zipfile.ZipFile(raw, "a", zipfile.ZIP_DEFLATED, False) as zip
):
    for file_name, file_data in ["example_dir/example_file.txt", bytes]:
        zip.writestr(file_name, file_data)

    zip.close()  # THIS is REQUIRED!

    requests.post(addr, files = {"file": ("zip_name.zip", zip.getbuffer())})

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接