是否有Python库可以在内存中操作zip归档文件,而无需使用实际的磁盘文件?
ZipFile库不允许您更新归档文件。唯一的方法似乎是将其解压缩到目录中,进行更改,然后从该目录创建一个新的zip文件。我想要在没有磁盘访问的情况下修改zip归档文件,因为我将下载它们,进行更改,然后再次上传它们,所以我没有理由存储它们。
类似于Java的ZipInputStream / ZipOutputStream的东西就可以做到这一点,尽管任何避免磁盘访问的界面都可以。
是否有Python库可以在内存中操作zip归档文件,而无需使用实际的磁盘文件?
ZipFile库不允许您更新归档文件。唯一的方法似乎是将其解压缩到目录中,进行更改,然后从该目录创建一个新的zip文件。我想要在没有磁盘访问的情况下修改zip归档文件,因为我将下载它们,进行更改,然后再次上传它们,所以我没有理由存储它们。
类似于Java的ZipInputStream / ZipOutputStream的东西就可以做到这一点,尽管任何避免磁盘访问的界面都可以。
import io
import zipfile
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a",
zipfile.ZIP_DEFLATED, False) as zip_file:
for file_name, data in [('1.txt', io.BytesIO(b'111')),
('2.txt', io.BytesIO(b'222'))]:
zip_file.writestr(file_name, data.getvalue())
with open('C:/1.zip', 'wb') as f:
f.write(zip_buffer.getvalue())
io.BytesIO(b'111')
更改为 b'111'
,并将 data.getvalue()
更改为 data
,结果不是相同的吗? - freesoulmy_zip_data
是一个包含有效构建的zip归档文件的字节对象时(当mode='r'时为默认值),此答案才有效。传递像zipfile.ZipFile(io.BytesIO(), mode='r')
这样的空内存缓冲区会失败,因为ZipFile在实例化时检查传递的文件类似对象中是否存在“中央目录结束记录”,当mode='r'时。作为解决方法,Validimir的答案提供了一种构建带有空虚拟文件的zip归档文件缓冲区的方法。 - zozo以下内容摘自 Python内存中的Zip压缩文章:
这是我在2008年5月关于使用Python在内存中进行压缩的一篇文章,由于Posterous即将关闭,现在重新发布。
我最近发现有一个需要收费的组件可以在Python中对文件进行内存中压缩。考虑到这应该是免费的,于是我编写了下面的代码。它只经过了很基本的测试,如果有人发现任何错误,请告诉我,我会及时更新。
import zipfile
import StringIO
class InMemoryZip(object):
def __init__(self):
# Create the in-memory file-like object
self.in_memory_zip = StringIO.StringIO()
def append(self, filename_in_zip, file_contents):
'''Appends a file with name filename_in_zip and contents of
file_contents to the in-memory zip.'''
# Get a handle to the in-memory zip in append mode
zf = zipfile.ZipFile(self.in_memory_zip, "a", zipfile.ZIP_DEFLATED, False)
# Write the file to the in-memory zip
zf.writestr(filename_in_zip, file_contents)
# Mark the files as having been created on Windows so that
# Unix permissions are not inferred as 0000
for zfile in zf.filelist:
zfile.create_system = 0
return self
def read(self):
'''Returns a string with the contents of the in-memory zip.'''
self.in_memory_zip.seek(0)
return self.in_memory_zip.read()
def writetofile(self, filename):
'''Writes the in-memory zip to a file.'''
f = file(filename, "w")
f.write(self.read())
f.close()
if __name__ == "__main__":
# Run a test
imz = InMemoryZip()
imz.append("test.txt", "Another test").append("test2.txt", "Still another")
imz.writetofile("test.zip")
提供的示例代码存在几个问题,其中一些问题很严重:
InMemoryZip
属性如果安装ruamel.std.zipfile
,则可获得更新的版本(我是该软件包的作者)。之后请参考相关文档。
pip install ruamel.std.zipfile
或者包括来自这里的类代码,您可以执行:
import ruamel.std.zipfile as zipfile
# Run a test
zipfile.InMemoryZipFile()
imz.append("test.txt", "Another test").append("test2.txt", "Still another")
imz.writetofile("test.zip")
您可以使用imz.data
将内容写入到任何需要的地方。
您还可以使用with
语句,如果提供了文件名,则在离开该上下文时ZIP的内容将被写入:
with zipfile.InMemoryZipFile('test.zip') as imz:
imz.append("test.txt", "Another test").append("test2.txt", "Still another")
由于延迟写入到磁盘,您实际上可以在该上下文中从旧的 test.zip
中读取。
我正在使用Flask创建一个内存zip文件并将其作为下载返回。基于Vladimir上面的示例。"seek(0)
"花了一些时间才弄明白。
import io
import zipfile
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
for file_name, data in [('1.txt', io.BytesIO(b'111')), ('2.txt', io.BytesIO(b'222'))]:
zip_file.writestr(file_name, data.getvalue())
zip_buffer.seek(0)
return send_file(zip_buffer, attachment_filename='filename.zip', as_attachment=True)
seek(0)
真是值得得到一枚奖章。 - undefined创建基于数据的多文件内存zip文件的帮助程序,例如{'1.txt': 'string', '2.txt": b'bytes'}
import io, zipfile
def prepare_zip_file_content(file_name_content: dict) -> bytes:
"""returns Zip bytes ready to be saved with
open('C:/1.zip', 'wb') as f: f.write(bytes)
@file_name_content dict like {'1.txt': 'string', '2.txt": b'bytes'}
"""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
for file_name, file_data in file_name_content.items():
zip_file.writestr(file_name, file_data)
zip_buffer.seek(0)
return zip_buffer.getvalue()
from datetime import datetime
import httpx
from stream_unzip import stream_unzip
from stream_zip import stream_zip, ZIP_64
def get_source_bytes_iter(url):
with httpx.stream('GET', url) as r:
yield from r.iter_bytes()
def get_target_files(files):
# stream-unzip doesn't expose perms or modified_at, but stream-zip requires them
modified_at = datetime.now()
perms = 0o600
for name, _, chunks in files:
# Could change name, manipulate chunks, skip a file, or yield a new file
yield name.decode(), modified_at, perms, ZIP_64, chunks
source_url = 'https://source.test/file.zip'
target_url = 'https://target.test/file.zip'
source_bytes_iter = get_source_bytes_iter(source_url)
source_files = stream_unzip(source_bytes_iter)
target_files = get_target_files(source_files)
target_bytes_iter = stream_zip(target_files)
httpx.put(target_url, data=target_bytes_iter)
你可以通过ctypes在Python中使用libarchive库 - 它提供了在内存中操作ZIP数据的方法,重点是流式处理(至少在历史上是这样)。
假设我们想要在从HTTP服务器下载时即时解压缩ZIP文件。下面是代码:
from contextlib import contextmanager
from ctypes import CFUNCTYPE, POINTER, create_string_buffer, cdll, byref, c_ssize_t, c_char_p, c_int, c_void_p, c_char
from ctypes.util import find_library
import httpx
def get_zipped_chunks(url, chunk_size=6553):
with httpx.stream('GET', url) as r:
yield from r.iter_bytes()
def stream_unzip(zipped_chunks, chunk_size=65536):
# Library
libarchive = cdll.LoadLibrary(find_library('archive'))
# Callback types
open_callback_type = CFUNCTYPE(c_int, c_void_p, c_void_p)
read_callback_type = CFUNCTYPE(c_ssize_t, c_void_p, c_void_p, POINTER(POINTER(c_char)))
close_callback_type = CFUNCTYPE(c_int, c_void_p, c_void_p)
# Function types
libarchive.archive_read_new.restype = c_void_p
libarchive.archive_read_open.argtypes = [c_void_p, c_void_p, open_callback_type, read_callback_type, close_callback_type]
libarchive.archive_read_finish.argtypes = [c_void_p]
libarchive.archive_entry_new.restype = c_void_p
libarchive.archive_read_next_header.argtypes = [c_void_p, c_void_p]
libarchive.archive_read_support_compression_all.argtypes = [c_void_p]
libarchive.archive_read_support_format_all.argtypes = [c_void_p]
libarchive.archive_entry_pathname.argtypes = [c_void_p]
libarchive.archive_entry_pathname.restype = c_char_p
libarchive.archive_read_data.argtypes = [c_void_p, POINTER(c_char), c_ssize_t]
libarchive.archive_read_data.restype = c_ssize_t
libarchive.archive_error_string.argtypes = [c_void_p]
libarchive.archive_error_string.restype = c_char_p
ARCHIVE_EOF = 1
ARCHIVE_OK = 0
it = iter(zipped_chunks)
compressed_bytes = None # Make sure not garbage collected
@contextmanager
def get_archive():
archive = libarchive.archive_read_new()
if not archive:
raise Exception('Unable to allocate archive')
try:
yield archive
finally:
libarchive.archive_read_finish(archive)
def read_callback(archive, client_data, buffer):
nonlocal compressed_bytes
try:
compressed_bytes = create_string_buffer(next(it))
except StopIteration:
return 0
else:
buffer[0] = compressed_bytes
return len(compressed_bytes) - 1
def uncompressed_chunks(archive):
uncompressed_bytes = create_string_buffer(chunk_size)
while (num := libarchive.archive_read_data(archive, uncompressed_bytes, len(uncompressed_bytes))) > 0:
yield uncompressed_bytes.value[:num]
if num < 0:
raise Exception(libarchive.archive_error_string(archive))
with get_archive() as archive:
libarchive.archive_read_support_compression_all(archive)
libarchive.archive_read_support_format_all(archive)
libarchive.archive_read_open(
archive, 0,
open_callback_type(0), read_callback_type(read_callback), close_callback_type(0),
)
entry = c_void_p(libarchive.archive_entry_new())
if not entry:
raise Exception('Unable to allocate entry')
while (status := libarchive.archive_read_next_header(archive, byref(entry))) == ARCHIVE_OK:
yield (libarchive.archive_entry_pathname(entry), uncompressed_chunks(archive))
if status != ARCHIVE_EOF:
raise Exception(libarchive.archive_error_string(archive))
zipped_chunks = get_zipped_chunks('https://domain.test/file.zip')
files = stream_unzip(zipped_chunks)
for name, uncompressed_chunks in stream_unzip(zipped_chunks):
print(name)
for uncompressed_chunk in uncompressed_chunks:
print(uncompressed_chunk)
with(
io.BytesIO() as raw,
zipfile.ZipFile(raw, "a", zipfile.ZIP_DEFLATED, False) as zip
):
for file_name, file_data in ["example_dir/example_file.txt", bytes]:
zip.writestr(file_name, file_data)
zip.close() # THIS is REQUIRED!
requests.post(addr, files = {"file": ("zip_name.zip", zip.getbuffer())})