使用Flask生成并流式传输压缩文件

15

我能够动态生成并流式传输文本,但无法动态生成并流式传输压缩文件。

from flask import Flask, request, Response,stream_with_context
import zlib
import gzip

app = Flask(__name__)

def generate_text():
    for x in range(10000):
        yield f"this is my line: {x}\n".encode()

@app.route('/stream_text')
def stream_text():
    response = Response(stream_with_context(generate_text()))
    return response

def generate_zip():
    for x in range(10000):
        yield zlib.compress(f"this is my line: {x}\n".encode())

@app.route('/stream_zip')
def stream_zip():
    response = Response(stream_with_context(generate_zip()), mimetype='application/zip')
    response.headers['Content-Disposition'] = 'attachment; filename=data.gz'
    return response

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000, debug=True)

不如使用curl和gunzip:

curl http://127.0.0.1:8000/stream_zip > data.gz

gunzip data.gz
gunzip: data.gz: not in gzip format

无论是zip、gzip还是其他类型的压缩格式,我都不在意。

我的真实代码中generate_text会生成超过4GB的数据,因此我希望能够即时压缩。

将文本保存到文件中,进行压缩,返回压缩文件,然后再删除文件并不是我想要的解决方案。

我需要在循环中生成一些文本——压缩这些文本——流式传输已压缩的数据,直到完成。

zip/gzip任何一种都可以,只要能用就行。

4个回答

22

你正在生成一系列压缩文档,而不是单个压缩流。不要使用zlib.compress(),它包括头部并形成单个文档。

相反,你需要创建一个zlib.compressobj()对象,并在该对象上使用Compress.compress()方法来生成数据流(紧接着是对Compress.flush()的最终调用):

def generate_zip():
    compressor = zlib.compressobj()
    for x in range(10000):
        chunk = compressor.compress(f"this is my line: {x}\n".encode())
        if chunk:
            yield chunk
    yield compressor.flush()

当压缩器还没有足够的数据来生成完整的压缩数据块时,它可以生成空块。只有在实际有数据要发送时才会产生上述结果。由于您的输入数据非常重复,因此可以高效地进行压缩,这将仅产生三次结果(一次是带有 2 字节标头,一次是涵盖 range() 的前 8288 次迭代的约 21kb 压缩数据,最后是剩余的 4kb 用于循环的其余部分)。

总体而言,这产生的数据与使用所有输入连接的单个 zlib.compress() 调用相同。此数据格式的正确 MIME 类型为 application/zlib,而不是 application/zip

但是,这种格式无法直接使用 gzip 进行解压缩,也不能够轻松地解压缩。这是因为上述方法尚未生成一个GZIP文件,它只是生成了一个原始的 zlib 压缩流。为了使它与 GZIP 兼容,您需要正确地配置压缩,先发送一个标头,并在结尾添加CRC 校验和数据长度值

import zlib
import struct
import time

def generate_gzip():
    # Yield a gzip file header first.
    yield bytes([
        0x1F, 0x8B, 0x08, 0x00,  # Gzip file, deflate, no filename
        *struct.pack('<L', int(time.time())),  # compression start time
        0x02, 0xFF,  # maximum compression, no OS specified
    ])

    # bookkeeping: the compression state, running CRC and total length
    compressor = zlib.compressobj(
        9, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
    crc = zlib.crc32(b"")
    length = 0

    for x in range(10000):
        data = f"this is my line: {x}\n".encode()
        chunk = compressor.compress(data)
        if chunk:
            yield chunk
        crc = zlib.crc32(data, crc) & 0xFFFFFFFF
        length += len(data)

    # Finishing off, send remainder of the compressed data, and CRC and length
    yield compressor.flush()
    yield struct.pack("<2L", crc, length & 0xFFFFFFFF)

请将此内容作为application/gzip呈现:

@app.route('/stream_gzip')
def stream_gzip():
    response = Response(stream_with_context(generate_gzip()), mimetype='application/gzip')
    response.headers['Content-Disposition'] = 'attachment; filename=data.gz'
    return response

结果可以即时进行解压缩:

curl http://127.0.0.1:8000/stream_gzip | gunzip -c | less

迁移到py3 https://gist.github.com/viewpointsa/b6be251d26af18456ad23841e56ddcb8 - themadmax
@themadmax:感谢提醒,我已经更新了我的帖子(采用清洁室风格,以便在这里刷新我的记忆,并更新了问题)。 - Martijn Pieters

1

虽然我对Martijn的解决方案印象非常深刻,但我决定自己编写一个使用pigz以获得更好性能的解决方案:

def yield_pigz(results, compresslevel=1):
    cmd = ['pigz', '-%d' % compresslevel]
    pigz_proc = subprocess.Popen(cmd, bufsize=0,
        stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    def f():
        for result in results:
            pigz_proc.stdin.write(result)
            pigz_proc.stdin.flush()
        pigz_proc.stdin.close()
    try:
        t = threading.Thread(target=f)
        t.start()
        while True:
            buf = pigz_proc.stdout.read(4096)
            if len(buf) == 0:
                break
            yield buf
    finally:
        t.join()
        pigz_proc.wait()

请记住,为了使此代码正常工作,您需要导入subprocessthreading。您还需要安装pigz程序(大多数Linux发行版的存储库中已经包含--在Ubuntu上,只需使用sudo apt install pigz -y命令即可)。

示例用法:

from flask import Flask, Response
import subprocess
import threading
import random

app = Flask(__name__)

def yield_something_random():
    for i in range(10000):
        seq = [chr(random.randint(ord('A'), ord('Z'))) for c in range(1000)]
        yield ''.join(seq)

@app.route('/')
def index():
    return Response(yield_pigz(yield_something_random()))

这是毫无意义的,恐怕这里没有任何好处。pigz 的工作方式是将输入分成单独的块,可以逐个压缩(成为 128 KB 的块),但是您正在提供一个数据流,并通过管道传输数据。您不会以足够快的速度编写数据,以便 pigz 找到更多块,如果有的话,同时增加了启动单独进程并通过管道来回传递数据的开销。每次请求都要启动一个单独的进程,这真的会拖累可扩展性。 - Martijn Pieters
当你有一个本地文件,可以从磁盘的不同块中读取整个系列的块,并传递给每个压缩线程时,Pigz非常好用。 - Martijn Pieters

-2

我认为目前你只是发送了生成器而不是数据!你可能想要做这样的事情(我还没有测试过,所以可能需要一些更改):

def generate_zip():
    import io
    with gzip.GzipFile(fileobj=io.BytesIO(), mode='w') as gfile:
        for x in xrange(10000):
             gfile.write("this is my line: {}\n".format(x))
    return gfile.read()

这就是生成器的全部意义,它在其他地方被迭代。问题出在使用zlib.compress()上。你的解决方案完全删除了流式传输的能力。 - Martijn Pieters

-2

使用低内存消耗的方式工作 generate_zip() :)

def generate_zip():
    buff = io.BytesIO()
    gz = gzip.GzipFile(mode='w', fileobj=buff)
    for x in xrange(10000):
        gz.write("this is my line: {}\n".format(x))
        yield buff.read()
        buff.truncate()
    gz.close()
    yield buff.getvalue()

直到最后一个yield,这实际上不会返回任何数据。 - mattjvincent
这将产生10k个空字符串,然后是整个文档。这也是过度的。没有必要在内存中保留单独的文件对象,只需直接流式传输压缩即可。如果您确实想使用GzipFile()对象,为什么不直接捕获write()调用,而不是使用BytesIO()对象? - Martijn Pieters
只有在循环中使用 buff.getvalue()(而不是 buff.read()),并使用 buff.truncate(0) 才能修复您的方法(您正在将其截断到当前位置,而不是到开头)。为避免产生空块,您可能希望测试 buff.getvalue() 是否实际产生了任何内容,仅在它确实产生时才进行 yield 和 truncate。 - Martijn Pieters

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接