Python如何解压字节流?

36

以下是情况:

  • 我从亚马逊S3获取gzip压缩的XML文档

  import boto
  from boto.s3.connection import S3Connection
  from boto.s3.key import Key
  conn = S3Connection('access Id', 'secret access key')
  b = conn.get_bucket('mydev.myorg')
  k = Key(b)
  k.key('documents/document.xml.gz')
  • 我从文件中读取它们的方式是

      import gzip
      f = open('/tmp/p', 'w')
      k.get_file(f)
      f.close()
      r = gzip.open('/tmp/p', 'rb')
      file_content = r.read()
      r.close()
    
  • 问题

    如何直接解压缩流并读取内容?

    我不想创建临时文件,它们看起来不好。

    4个回答

    41

    是的,你可以使用zlib模块来解压字节流:

    import zlib
    
    def stream_gzip_decompress(stream):
        dec = zlib.decompressobj(32 + zlib.MAX_WBITS)  # offset 32 to skip the header
        for chunk in stream:
            rv = dec.decompress(chunk)
            if rv:
                yield rv
        if dec.unused_data:
            # decompress and yield the remainder
            yield dec.flush()
    

    32的偏移量信号给zlib头部指示gzip头部被期望但被跳过。
    S3键对象是一个迭代器,所以你可以这样做:
    for data in stream_gzip_decompress(k):
        # do something with the decompressed data
    

    感谢您的回复,@MartijnPieters!奇怪的是,这似乎没有解决问题。(对于以下1行代码表示歉意)dec = zlib.decompressobj(32 + zlib.MAX_WBITS); for chunk in app.s3_client.get_object(Bucket=bucket, Key=key)["Body"].iter_chunks(2 ** 19): data = dec.decompress(chunk); print(len(data));输出65505,然后是0、0、0、0、0、……这可能与iter_chunks()有关吗? - WillJones
    @WillJones:请单独发布一个问题,这不是我们可以在评论中解决的事情。抱歉! - Martijn Pieters
    没问题,@Martijn!我现在就翻译。 - WillJones
    刚刚在这里创建了一个问题:https://stackoverflow.com/questions/61048597/ungzipping-chunks-of-bytes-from-from-s3-using-iter-chunks 谢谢您的关注! - WillJones
    这需要在最后调用dec.flush()来确保不会错过任何数据吗? - Michal Charemza
    1
    @MichalCharemza:是的,这是一个非常好的观点。在最后加上if dec.unused_data: yield dec.flush() - Martijn Pieters

    10

    我曾经也需要做同样的事情,这是我所采用的方法:

    import gzip
    f = StringIO.StringIO()
    k.get_file(f)
    f.seek(0) #This is crucial
    gzf = gzip.GzipFile(fileobj=f)
    file_content = gzf.read()
    

    3
    这里的“k”是什么? - Alex R

    6

    针对Python3x和boto3-

    因此,我使用BytesIO将压缩文件读入缓冲区对象,然后使用zipfile将解压流作为未压缩数据打开,从而逐行获取数据。

    import io
    import zipfile
    import boto3
    import sys
    
    s3 = boto3.resource('s3', 'us-east-1')
    
    
    def stream_zip_file():
        count = 0
        obj = s3.Object(
            bucket_name='MonkeyBusiness',
            key='/Daily/Business/Banana/{current-date}/banana.zip'
        )
        buffer = io.BytesIO(obj.get()["Body"].read())
        print (buffer)
        z = zipfile.ZipFile(buffer)
        foo2 = z.open(z.infolist()[0])
        print(sys.getsizeof(foo2))
        line_counter = 0
        for _ in foo2:
            line_counter += 1
        print (line_counter)
        z.close()
    
    
    if __name__ == '__main__':
        stream_zip_file()
    

    1
    我注意到当我们执行 buffer = io.BytesIO(obj.get()["Body"].read()) 时,内存消耗会显著增加。然而,使用 read(1024) 只读取数据的某个部分可以保持内存使用率较低! - user 923227
    8
    buffer = io.BytesIO(obj.get()["Body"].read())代码会将整个文件读入内存。 - Kirk Broadhurst

    0

    您可以尝试使用PIPE,在不下载文件的情况下读取内容

        import subprocess
        c = subprocess.Popen(['-c','zcat -c <gzip file name>'], shell=True, stdout=subprocess.PIPE,         stderr=subprocess.PIPE)
        for row in c.stdout:
          print row
    

    此外,"/dev/fd/" + str(c.stdout.fileno()) 将为您提供FIFO文件名(命名管道),可以传递给其他程序。

    但是你会将来自S3的压缩字节传递给zcat吗? - Ciprian Tomoiagă
    zcat is not particularly portable, you'd better use gunzip -c - Eli Korvigo
    5
    光是为了进行这个操作就花费很多精力来设置流程等是完全不正确的做法。 - Chris Withers
    1
    我完全同意,CW。但我很喜欢看到它被提供。 - mohawkTrail

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接