Python:如何创建一个类似文件的流式gzip压缩文件?

33

我正在尝试找出使用 Python 的 zlib 压缩流的最佳方法。

我有一个文件输入流(如下的input)和一个接受类似文件的输出函数(如下的output_function):

with open("file") as input:
    output_function(input)

我想在将input块发送到output_function 之前,对它们进行gzip压缩:

with open("file") as input:
    output_function(gzip_stream(input))

看起来 gzip 模块假定输入或输出是一个 gzip 文件...所以我想,zlib 模块可能是我需要的。

然而,它本身并没有提供一种简单的方式来创建类文件的流...而且它支持的流压缩是通过手动向压缩缓冲区添加数据,然后刷新该缓冲区来实现的。

当然,我可以写一个包装器来包装 zlib.Compress.compresszlib.Compress.flushCompress 是由 zlib.compressobj() 返回的),但我担心会弄错缓冲区大小或类似的问题。

那么,使用 Python 创建一个流式的、gzip 压缩的类文件最简单的方法是什么?

编辑:澄清一下,输入流和压缩输出流都太大了,无法放入内存,因此像 output_function(StringIO(zlib.compress(input.read()))) 这样的方法并不能解决问题。


我找到了一个相反的实现 - 一个类似文件的解压缩gzip流 - 在effbot上:http://effbot.org/librarybook/zlib.htm ... 但我正在寻找相反的东西(虽然我想如果需要编写自己的代码,这可能会有所帮助) - David Wolever
6个回答

15

虽然它有些笨重(自我引用等等;只花了几分钟写它,没有什么优雅的地方),但如果你仍然想使用 gzip 而不是直接使用 zlib,那么它可以实现你想要的功能。

基本上,GzipWrap 是一个(非常有限的)类文件对象,它可以将给定的可迭代对象(例如,类文件对象、字符串列表、任何生成器等)生成一个gzip文件。

当然,它生成的是二进制文件,因此实现"readline"没有意义。

你应该能够扩展它以覆盖其他情况或者将其用作可迭代对象本身。

from gzip import GzipFile

class GzipWrap(object):
    # input is a filelike object that feeds the input
    def __init__(self, input, filename = None):
        self.input = input
        self.buffer = ''
        self.zipper = GzipFile(filename, mode = 'wb', fileobj = self)

    def read(self, size=-1):
        if (size < 0) or len(self.buffer) < size:
            for s in self.input:
                self.zipper.write(s)
                if size > 0 and len(self.buffer) >= size:
                    self.zipper.flush()
                    break
            else:
                self.zipper.close()
            if size < 0:
                ret = self.buffer
                self.buffer = ''
        else:
            ret, self.buffer = self.buffer[:size], self.buffer[size:]
        return ret

    def flush(self):
        pass

    def write(self, data):
        self.buffer += data

    def close(self):
        self.input.close()

1
好的,我明白你的观点,将“self”传递给GzipFile并不是特别优雅……但我仍然认为这是一个巧妙的技巧。 - David Wolever
我已经在代码中纠正了一个小错误。当使用大小<0进行读取时,它没有清除缓冲区。我不认为你会像那样使用它,但是错误就是错误... O:) - Ricardo Cárdenes
6
标准库为什么没有提供这样的功能?一个不能流式传输的gzip工具有什么用?通常情况下,整个文件无法放入内存中(毕竟它是经过gzip压缩的)。 - Kevin
我猜他们把它包括在内部使用中了。不过我还没有检查它在过去一年里是否有改变... - Ricardo Cárdenes
1
当前版本的答案一次将一行读入内存。如果您要压缩的文件中换行符很少,这可能会成为一个问题。 - Talia

11

这是一份更清洁的版本,基于Ricardo Cárdenes非常有帮助的回答,但不会自我引用。

from gzip import GzipFile
from collections import deque


CHUNK = 16 * 1024


class Buffer (object):
    def __init__ (self):
        self.__buf = deque()
        self.__size = 0
    def __len__ (self):
        return self.__size
    def write (self, data):
        self.__buf.append(data)
        self.__size += len(data)
    def read (self, size=-1):
        if size < 0: size = self.__size
        ret_list = []
        while size > 0 and len(self.__buf):
            s = self.__buf.popleft()
            size -= len(s)
            ret_list.append(s)
        if size < 0:
            ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:]
            self.__buf.appendleft(remainder)
        ret = ''.join(ret_list)
        self.__size -= len(ret)
        return ret
    def flush (self):
        pass
    def close (self):
        pass


class GzipCompressReadStream (object):
    def __init__ (self, fileobj):
        self.__input = fileobj
        self.__buf = Buffer()
        self.__gzip = GzipFile(None, mode='wb', fileobj=self.__buf)
    def read (self, size=-1):
        while size < 0 or len(self.__buf) < size:
            s = self.__input.read(CHUNK)
            if not s:
                self.__gzip.close()
                break
            self.__gzip.write(s)
        return self.__buf.read(size)

优点:

  • 避免了重复的字符串拼接,这将导致整个字符串反复复制。
  • 从输入流中读取固定大小的CHUNK,而不是一次性读取整行(可以任意长)。
  • 避免了循环引用。
  • 避免了GzipCompressStream()的具有误导性的公共“write”方法,该方法实际上仅在内部使用。
  • 利用名称混淆来处理内部成员变量。

1
对于Python 3,请使用ret = b''.join(ret_list),如果您的输入流是字符串,则必须进行编码,例如self.__gzip.write(s.encode('utf-8') - dstandish

6

gzip模块支持将压缩后的内容写入到类似文件的对象中,只需要向GzipFile传递一个fileobj参数和一个filename。你传递的filename不需要存在,但是gzip头部有一个需要填写文件名的字段。

更新

这个解答是错误的。例如:

# tmp/try-gzip.py 
import sys
import gzip

fd=gzip.GzipFile(fileobj=sys.stdin)
sys.stdout.write(fd.read())

输出:

===> cat .bash_history  | python tmp/try-gzip.py  > tmp/history.gzip
Traceback (most recent call last):
  File "tmp/try-gzip.py", line 7, in <module>
    sys.stdout.write(fd.read())
  File "/usr/lib/python2.7/gzip.py", line 254, in read
    self._read(readsize)
  File "/usr/lib/python2.7/gzip.py", line 288, in _read
    pos = self.fileobj.tell()   # Save current position
IOError: [Errno 29] Illegal seek

嗯...我之前没有注意到这个问题...但我不确定它是否有效:要么fileobj必须是一个gzip压缩的输入流,要么是一个输出流,gzip数据将被写入其中。所以,虽然比没有好,但仍不完全符合我的期望。 - David Wolever
2
请解释一下为什么这个方法不能解决你的问题。我使用 fd=gzip.GzipFile(fileobj=fd),它可以正常工作。 - guettli
@guettli 作者期望 fd 对象没有 seek 方法。 - Andrey Cizov
@AndreyCizov 是的,你是对的。我已经检查过并编辑了答案。希望 user249290 没有反对。 - guettli

2

使用cStringIO(或StringIO)模块与zlib一起使用:

>>> import zlib
>>> from cStringIO import StringIO
>>> s.write(zlib.compress("I'm a lumberjack"))
>>> s.seek(0)
>>> zlib.decompress(s.read())
"I'm a lumberjack"

8
然而,这样做的问题是,在将输入流传递给 zlib.compress 时,整个输入流必须被加载到内存中,然后在从 zlib.decompress 返回时,必须再次将其加载到内存中。 - David Wolever
1
如果您使用StringIO,则它永远不会离开内存。您在问题中提到您想要一个“类似文件的对象”,这是Python常用术语,指具有类似于文件对象的方法的对象。它并没有说明它是否存在于磁盘上。但是,您还建议您不想要一个gz文件。您能否更清楚地说明您真正想要什么? - jcdyer
抱歉,是我错了。在我的理解中,“类似文件的对象”意味着“一些旨在以块为单位处理的东西”,但我想这是一个错误的假设。我已经更新了问题。 - David Wolever
1
你看过zlib.compressobj()zlib.decompressobj()吗?非常适合分块处理。 - jcdyer
是的,我有。正如我提到的(尽管不是很清楚),它们可以工作,但其界面不是很标准化,这可能取决于我是否正确设置缓冲区大小。 - David Wolever

0
这个(至少在Python 3中)是有效的:
with s3.open(path, 'wb') as f:
    gz = gzip.GzipFile(filename, 'wb', 9, f)
    gz.write(b'hello')
    gz.flush()
    gz.close()

这里使用gzip压缩对s3fs的文件对象进行写入。 关键在于f参数,它是GzipFile的fileobj。你需要为gzip的头信息提供一个文件名。

0

由可重用组件构建的更干净、更通用的版本:

gzipped_iter = igizip(io_iter(input_file_obj))
gzipped_file_obj = iter_io(prefetch(gzipped_iter))

上面的函数来自我的gist
  • iter_ioio_iter 提供了透明转换到/从Iterable[AnyStr]<->SupportsRead[AnyStr]
  • igzip 实现了流式gzip压缩
  • (可选) prefetch 通过线程并发地从底层可迭代对象中抽取数据,像正常迭代一样向消费者生成数据,支持并发读写。
def as_bytes(s: str | bytes):
    if type(s) not in [str, bytes]:
        raise TypeError
    return s.encode() if isinstance(s, str) else s


def iter_io(iterable: Iterable[AnyStr], buffer_size: int = io.DEFAULT_BUFFER_SIZE):
    """
    Returns a buffered file obj that reads bytes from an iterable of str/bytes.

    Example:

    iter_io(['abc', 'def', 'g']).read() == b'abcdefg'
    iter_io([b'abcd', b'efg']).read(5) == b'abcde'
    """
    class IterIO(io.RawIOBase):
        def __init__(self, iterable: Iterable[AnyStr]):
            self._leftover = b''
            self._iterable = (as_bytes(s) for s in iterable if s)

        def readable(self):
            return True

        def readinto(self, buf):
            try:
                chunk = self._leftover or next(self._iterable)
            except StopIteration:
                return 0    # indicate EOF

            output, self._leftover = chunk[:len(buf)], chunk[len(buf):]
            buf[:len(output)] = output
            return len(output)

    return io.BufferedReader(IterIO(iterable), buffer_size=buffer_size)


def io_iter(fo: SupportsRead[AnyStr], size: int = io.DEFAULT_BUFFER_SIZE):
    """
    Returns an iterator that reads from a file obj in sized chunks.

    Example:

    list(io_iter(io.StringIO('abcdefg'), 3)) == ['abc', 'def', 'g']
    list(io_iter(io.BytesIO(b'abcdefg'), 4)) == [b'abcd', b'efg']

    Usage notes/TODO:
     * file obj isn't closed, fix /w keep_open=False and an internal contextmanager
    """
    return iter(lambda: fo.read(size), fo.read(0))


def igzip(chunks: Iterable[AnyStr], level=zlib.Z_DEFAULT_COMPRESSION):
    """
    Streaming gzip: lazily compresses an iterable of bytes or str (utf8)

    Example:

    gzipped_bytes_iter = igzip(['hello ', 'world!'])
    gzip.decompress(b''.join(gzipped_bytes_iter)).encode() == 'hello world!'
    """
    def gen():
        gzip_format = 0b10000
        c = zlib.compressobj(level=level, wbits=zlib.MAX_WBITS + gzip_format)

        yield from (c.compress(as_bytes(chunk)) for chunk in chunks)
        yield c.flush()

    return filter(None, gen())


def prefetch(iterable: Iterable[Any], n: int = 1) -> Iterator[Any]:
    """
    Prefetch an iterable via thread, yielding original contents as normal.

    Example:

    def slow_produce(*args):
        for x in args:
            time.sleep(1)
            yield x

    def slow_consume(iterable):
        for _ in iterable:
            time.sleep(1)

    slow_consume(prefetch(slow_produce('a', 'b')))  # takes 3 sec, not 4

    # Prefetch
    # produce: | 'a' | 'b' |
    # consume:       | 'a' | 'b' |
    # seconds: 0 --- 1 --- 2 --- 3

    # No prefetch
    # produce: | 'a' |     | 'b' |
    # consume:       | 'a' |     | 'b' |
    # seconds: 0 --- 1 --- 2 --- 3 --- 4

    Usage notes/TODO:
     * mem leak: Thread is GC'd only after iterable is fully consumed, fix /w __del__
    """
    queue = Queue(n)
    finished = object()

    def produce():
        for x in iterable:
            queue.put(x)
        queue.put(finished)

    t = Thread(target=produce, daemon=True)
    t.start()

    while True:
        item = queue.get()
        if item is finished:
            break
        else:
            yield item

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接