Java顺序解压GZIP流

4
我的Java程序实现了一个服务器,应该从客户端通过Websockets获取一个非常大的文件,该文件使用gzip进行压缩,并且应该检查文件内容中的一些字节模式。
客户端发送嵌入在专有协议中的文件块,因此我从客户端获得消息后,解析消息并提取gzipped文件内容。
我无法将整个文件保存在程序内存中,因此我正在尝试解压每个块,处理数据并继续下一个块。
我正在使用以下代码:
public static String gzipDecompress(byte[] compressed) throws IOException {
    String uncompressed;
    try (
        ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
        GZIPInputStream gis = new GZIPInputStream(bis);
        Reader reader = new InputStreamReader(gis);
        Writer writer = new StringWriter()
    ) {

      char[] buffer = new char[10240];
      for (int length = 0; (length = reader.read(buffer)) > 0; ) {
        writer.write(buffer, 0, length);
      }
      uncompressed = writer.toString();
    }

    return uncompressed;
  }

但是在使用第一个压缩块调用函数时,我遇到了以下异常:
java.io.EOFException: Unexpected end of ZLIB input stream
    at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:240)
    at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158)
    at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.Reader.read(Reader.java:140)

重要的是要提到我没有跳过任何数据块并且正在按顺序逐个解压缩数据块。 我错过了什么吗?

1
起初并不清楚这些数据来自何处。您应该创建一个流,它将读取所有数据,并将其包装在GZipInputStream中。它不需要将所有数据都存储在内存中,但它应该是一个单一的流。 - Jon Skeet
2个回答

6
问题在于您手动处理这些块。

正确的方法应该是获取一些InputStream,将其包装在GZIPInputStream中,然后读取数据。

    InputStream is = // obtain the original gzip stream

    GZIPInputStream gis = new GZIPInputStream(is);
    Reader reader = new InputStreamReader(gis);

    //... proceed reading and so on

GZIPInputStream 以流的方式工作,因此如果您只从reader中一次请求10KB,那么无论初始 GZIP 文件的大小如何,整体的内存占用量将很低。

在问题更新后更新

你的情况可能有一个解决方案,就是编写一个 InputStream 实现,该实现通过客户端协议处理程序以块的形式流传输字节。

这里是一个原型:

public class ProtocolDataInputStream extends InputStream {
    private BlockingQueue<byte[]> nextChunks = new ArrayBlockingQueue<byte[]>(100);
    private byte[] currentChunk = null;
    private int currentChunkOffset = 0;
    private boolean noMoreChunks = false;

    @Override
    public synchronized int read() throws IOException {
        boolean takeNextChunk = currentChunk == null || currentChunkOffset >= currentChunk.length;
        if (takeNextChunk) {
            if (noMoreChunks) {
                // stream is exhausted
                return -1;
            } else {
                currentChunk = nextChunks.take();
                currentChunkOffset = 0;
            }
        }
        return currentChunk[currentChunkOffset++];
    }

    @Override
    public synchronized int available() throws IOException {
        if (currentChunk == null) {
            return 0;
        } else {
            return currentChunk.length - currentChunkOffset;
        }
    }

    public synchronized void addChunk(byte[] chunk, boolean chunkIsLast) {
        nextChunks.add(chunk);
        if (chunkIsLast) {
            noMoreChunks = true;
        }
    }
}

您的客户端协议处理程序使用 addChunk() 添加字节块,而解压代码从该流中提取数据(通过 Reader)。

请注意,此代码存在一些问题:

  1. 使用的队列具有有限的大小。如果过于频繁地调用 addChunk(),则队列可能会被填满,这将阻塞 addChunk()。这可能是可取的,也可能不是。
  2. 仅为说明目的而实现了 read() 方法。为了性能更好,最好以相同方式实现 read(byte[])
  3. 保守的同步在假定读者(解压器)和写作者(调用 addChunk() 的协议处理程序)是不同线程的情况下使用。
  4. InterruptedExceptiontake() 上未受理以避免过多细节。

如果您的解压程序和 addChunk() 在同一个线程(在同一个循环中)中执行,则可以尝试在使用 InputStream 时拉取时使用 InputStream.available() 方法或在使用 Reader 拉取时使用 Reader.ready()


1
我能不能使用ByteArrayInputStream或其他将字节数组包装成InputStream的方式作为我传递给GZIPInputStream的输入流呢?在我的情况下,我无法真正使用从服务器获取数据的原始InputStream。 - Eldad
为什么不能使用原始的 InputStream?我所知道的唯一安全的将字节提供给 GZIPInputStream 的方法是首先将所有字节都读入内存,这并不适用于大型文件。 - Roman Puchkovskiy
我添加了详细信息以更好地描述情况。我正在获取嵌入在专有协议中的文件块,因此我的InputStream会得到完整的协议消息,解析它,然后从中提取文件块,只有这样才能解压缩块。我无法控制客户端,也不知道包含下一个文件块的下一条消息何时到达。感谢您的理解,对于糟糕的描述我深表歉意。 - Eldad

2
来自gzip流的任意字节序列都不是有效的独立gzip数据。你必须以某种方式将所有字节块连接起来。最简单的方法是使用一个简单的管道将它们全部累积起来:
import java.io.PipedOutputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;

public class ChunkInflater {
    private final PipedOutputStream pipe;

    private final InputStream stream;

    public ChunkInflater()
    throws IOException {
        pipe = new PipedOutputStream();
        stream = new GZIPInputStream(new PipedInputStream(pipe));
    }

    public InputStream getInputStream() {
        return stream;
    }

    public void addChunk(byte[] compressedChunk)
    throws IOException {
        pipe.write(compressedChunk);
    }
}

现在你有了一个InputStream,可以按照任意大小的增量进行读取。例如:
ChunkInflater inflater = new ChunkInflater();

Callable<Void> chunkReader = new Callable<Void>() {
    @Override
    public Void call()
    throws IOException {
        byte[] chunk;
        while ((chunk = readChunkFromSource()) != null) {
            inflater.addChunk(chunk);
        }

        return null;
    }
};
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(chunkReader);
executor.shutdown();

Reader reader = new InputStreamReader(inflater.getInputStream());
// read text here

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接