高效地将多个zlib压缩的数据流连接成一个单一的数据流

8
如果我有多个包含压缩zlib数据的二进制字符串,是否有一种高效的方法将它们合并成一个压缩字符串,而不需要解压所有内容?
现在我需要做的示例:
c1 = zlib.compress("The quick brown fox jumped over the lazy dog. ")
c2 = zlib.compress("We ride at dawn! ")
c = zlib.compress(zlib.decompress(c1)+zlib.decompress(c2)) # Warning: Inefficient!

d1 = zlib.decompress(c1)
d2 = zlib.decompress(c2)
d = zlib.decompress(c)

assert d1+d2 == d # This will pass!

我想要的例子:

c1 = zlib.compress("The quick brown fox jumped over the lazy dog. ")
c2 = zlib.compress("We ride at dawn! ")
c = magic_zlib_add(c1+c2) # Magical method of combining compressed streams

d1 = zlib.decompress(c1)
d2 = zlib.decompress(c2)
d = zlib.decompress(c)

assert d1+d2 == d # This should pass!

我对zlib和DEFLATE算法并不是很了解,因此从理论上讲这可能完全不可能。另外,我必须使用zlib;因此,我不能包装zlib并提出自己的协议,以透明地处理串联流。
注:如果解决方案在Python中不是很简单,我并不介意。我愿意编写一些C代码,并在Python中使用ctypes。
3个回答

8

如果您不介意涉及C语言,可以从查看gzjoin代码开始。

请注意,gzjoin代码需要解压以查找合并时需要更改的部分,但不必重新压缩。这并不太糟糕,因为解压通常比压缩快。


这并不像我所希望的那样简单,但它绝对比我以前做的更有效率。谢谢。 - DSnet
2
具体回答这个问题,你不能将两个deflate流合并而不解压第一个流,就像gzjoin所做的那样。但是,如果你控制创建第一个deflate流,它可以被特别准备以便在不解压的情况下附加到它上面。 - Mark Adler
哇塞,@MarkAdler关于压缩的评论就像从达赖喇嘛那里听到有关寻找内心平静的建议一样。您能详细说明一下如何控制第一个deflate流吗?我们正在尝试使用Z_SYNC_FULL来使用zlib-streams实现类似的功能。 - meawoppl
5
你可以使用Z_SYNC_FLUSH(或Z_FULL_FLUSH - 在这种情况下无所谓),完成通缩,确保在此之后获得所有压缩数据(strm.avail_out!= 0),然后跟随一个Z_FINISH。 然后,您可以剥离流的最后两个字节,并直接将另一个压缩流连接到该流。 - Mark Adler
2
为了找到内心的平静,接受自己的本来面目,也要接受他人的真实模样。 - Mark Adler
5
@MarkAdler 感谢您指出了正确的方向。我正在与meawoppl一起开发一个并行化的PNG压缩器。我们无法使用您建议的方法连接两个流,但我们能够通过以下方式使其工作:对于第一个流,请使用Z_SYNC_FLUSH调用deflate。对于每个后续流,请使用Z_SYNC_FLUSH调用deflate,剥离前两个字节,然后连接,同时收集adler32值和未压缩长度。然后对于最后一个流,使用Z_FINISH进行压缩,剥离4字节的校验和,并用所有校验和的adler32_combine()替换它。 - zorlak

6
除了需要解压第一个deflate流的gzjoin之外,您还可以查看gzlog.hgzlog.c,它们可以有效地将短字符串附加到gzip文件中,而无需每次解压缩deflate流。 (它可以轻松修改为在gzip-wrapped deflate数据而不是zlib-wrapped deflate数据上运行。)如果您控制第一个deflate流的创建,则应使用此方法。如果您没有创建第一个deflate流,则必须使用需要解压缩的gzjoin方法。

所有方法都不需要重新压缩。


1
很高兴知道zlib的作者正在回答这个问题。:D我会研究这些的。感谢您对自由软件世界的贡献! - DSnet
你说 gzjoin 技术只需要解压缩第一个流。然而,代码似乎对所有流程都一视同仁(并在它们上调用 inflate())。我理解错了吗?另外,你说如果第一个流是特别构造的,那么不需要解压缩。是否有可能特别构造一个没有原始输入数据的流,然后每次将其用作第一个流? - Kannan Goundan
2
您可以通过仅解码第一个流来组合两个deflate流。然而,gzip流并不总是在字节的末尾结束,因此gzjoin.c也会解码最后一个流,以确保它知道deflate数据实际上在哪里结束,从而正确定位crc和长度。 - Mark Adler
1
五个字节 0x00、0x00、0x00、0xff、0xff 是一个以字节边界结束的零长度存储块。因此,您可以将任何压缩流连接到该块上。然而,我不清楚这对您有什么用处。 - Mark Adler
假设我有两个10MB的deflate流,我想要将它们连接起来。你说我们只需要解压缩第一个流就可以将它们连接起来。我们能否假装我们正在连接三个流--特殊的零长度流,后面跟着两个10MB的流?这样我们只需要解压缩零长度流而不是第一个10MB的流。我是否有什么误解?(顺便说一句,感谢您抽出时间回答!) - Kannan Goundan
由于您构建了长度为零的流,因此无需对其进行解压缩。 您可以将其构建为字节对齐且最后一位未设置,因此可以直接将其前置。 您始终可以在deflate流之前附加“0x00 0x00 0x00 0xff 0xff”(空存储块)。 或“0x02 0x08 0x20 0x80 0x00”(四个空固定码块)。 - Mark Adler

2
我只是把 @zorlak的评论 转化成了一个答案,并添加了一些代码,以便我以后可以找到它。
引用块: 对于第一个流,请使用 Z_SYNC_FLUSH 调用 deflate()。对于每个后续流,请使用 Z_SYNC_FLUSH 调用 deflate(),剥离前两个字节并连接,同时收集 Adler32 值和未压缩长度。然后对于最后一个流,[调用] deflate() 使用 Z_FINISH,剥离 4 字节的校验和,并将其替换为所有校验和的 adler32_combine()
如果您可以控制流的初始压缩,请在某个地方存储未压缩数据的长度、其 Adler-32 校验和和压缩数据。稍后,您可以按任意顺序连接各个单独的流。
请注意,我不确定单独的流是否可以具有不同的压缩级别、压缩策略或窗口大小,因为 concatenate 函数会剥离除第一个流以外的所有 zlib 头部...
from typing import Tuple
import zlib


def prepare(data: bytes) -> Tuple[int, bytes, int]:
    deflate = zlib.compressobj()
    result = deflate.compress(data)
    result += deflate.flush(zlib.Z_SYNC_FLUSH)
    return len(data), result, zlib.adler32(data)


def concatenate(*chunks: Tuple[int, bytes, int]) -> bytes:
    if not chunks:
        return b''
    _, result, final_checksum = chunks[0]
    for length, chunk, checksum in chunks[1:]:
        result += chunk[2:]  # strip the zlib header
        final_checksum = adler32_combine(final_checksum, checksum, length)
    result += b'\x03\x00'  # insert a final empty block
    result += final_checksum.to_bytes(4, byteorder='big')
    return result


def adler32_combine(adler1: int, adler2: int, length2: int) -> int:
    # Python implementation of adler32_combine
    # The orignal C implementation is Copyright (C) 1995-2011, 2016 Mark Adler
    # see https://github.com/madler/zlib/blob/master/adler32.c#L143
    BASE = 65521
    WORD = 0xffff
    DWORD = 0xffffffff
    if adler1 < 0 or adler1 > DWORD:
        raise ValueError('adler1 must be between 0 and 2^32')
    if adler2 < 0 or adler2 > DWORD:
        raise ValueError('adler2 must be between 0 and 2^32')
    if length2 < 0:
        raise ValueError('length2 must not be negative')

    remainder = length2 % BASE
    sum1 = adler1 & WORD
    sum2 = (remainder * sum1) % BASE
    sum1 += (adler2 & WORD) + BASE - 1
    sum2 += ((adler1 >> 16) & WORD) + ((adler2 >> 16) & WORD) + BASE - remainder
    if sum1 >= BASE:
        sum1 -= BASE
    if sum1 >= BASE:
        sum1 -= BASE
    if sum2 >= (BASE << 1):
        sum2 -= (BASE << 1)
    if sum2 >= BASE:
        sum2 -= BASE

    return (sum1 | (sum2 << 16))

一个快速的例子:
hello = prepare(b'Hello World! ')
test = prepare(b'This is a test. ')
fox = prepare(b'The quick brown fox jumped over the lazy dog. ')
dawn = prepare(b'We ride at dawn! ')

# these all print what you would expect
print(zlib.decompress(concatenate(hello, test, fox, dawn)))
print(zlib.decompress(concatenate(dawn, fox, test, hello)))
print(zlib.decompress(concatenate(fox, hello, dawn, test)))
print(zlib.decompress(concatenate(test, dawn, hello, fox)))

太棒了!我在 Julia 中使用了类似的版本,效果非常好。 - phyatt

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接