当解压缩HTTPInputStream时,GZIPInputStream会过早关闭。

8

问题

请参见下面编辑部分中的更新的问题

我正在尝试使用GZIPInputStream实时解压缩来自Amazon S3的大型(~300M)GZIP文件,但它仅输出文件的一部分。但是,如果在解压缩之前下载到文件系统,则GZIPInputStream将解压缩整个文件。

如何使GZIPInputStream解压缩整个HTTPInputStream而不仅仅是其第一部分?

我已经尝试过的方法

请参见下面编辑部分中的更新

我怀疑这是一个HTTP问题,因为没有任何异常被抛出,GZIPInputStream每次返回一个相当一致的文件块,并且据我所知,它总是在WET记录边界处断开,尽管它选择的边界对于每个URL都不同(这非常奇怪,因为所有内容都被视为二进制流,根本没有对文件中的WET记录进行解析)。

我找到的最接近的问题是GZIPInputStream在从s3读取时过早关闭。该问题的答案是某些GZIP文件实际上是多个附加的GZIP文件,而GZIPInputStream无法很好地处理。 但是,如果情况确实如此,在本地副本上为什么GZIPInputStream可以正常工作呢?

演示代码和输出

下面是一个演示我遇到的问题的示例代码片段。我使用Java 1.8.0_72和1.8.0_112在两台不同的Linux计算机上进行了测试,并获得了类似的结果。我期望从解压缩的HTTPInputStream中的字节数与从解压缩的本地文件副本中的字节数相同,但解压缩的HTTPInputStream要小得多。

Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 87894 bytes from HTTP->GZIP
Read 448974935 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file testfile0.wet
------
Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00040-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 1772936 bytes from HTTP->GZIP
Read 451171329 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file testfile40.wet
------
Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698541142.66/wet/CC-MAIN-20161202170901-00500-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 89217 bytes from HTTP->GZIP
Read 453183600 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file testfile500.wet

import java.net.*;
import java.io.*;
import java.util.zip.GZIPInputStream;
import java.nio.channels.*;

public class GZIPTest {
    public static void main(String[] args) throws Exception {
        // Our three test files from CommonCrawl
        URL url0 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz");
        URL url40 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00040-ip-10-31-129-80.ec2.internal.warc.wet.gz");
        URL url500 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698541142.66/wet/CC-MAIN-20161202170901-00500-ip-10-31-129-80.ec2.internal.warc.wet.gz");

        /*
         * Test the URLs and display the results
         */
        test(url0, "testfile0.wet");
        System.out.println("------");
        test(url40, "testfile40.wet");
        System.out.println("------");
        test(url500, "testfile500.wet");
    }

    public static void test(URL url, String testGZFileName) throws Exception {
        System.out.println("Testing URL "+url.toString());

        // First directly wrap the HTTPInputStream with GZIPInputStream
        // and count the number of bytes we read
        // Go ahead and save the extracted stream to a file for further inspection
        System.out.println("Testing HTTP Input Stream direct to GZIPInputStream");
        int bytesFromGZIPDirect = 0;
        URLConnection urlConnection = url.openConnection();
        FileOutputStream directGZIPOutStream = new FileOutputStream("./"+testGZFileName);

        // FIRST TEST - Decompress from HTTPInputStream
        GZIPInputStream gzipishttp = new GZIPInputStream(urlConnection.getInputStream());

        byte[] buffer = new byte[1024];
        int bytesRead = -1;
        while ((bytesRead = gzipishttp.read(buffer, 0, 1024)) != -1) {
            bytesFromGZIPDirect += bytesRead;
            directGZIPOutStream.write(buffer, 0, bytesRead); // save to file for further inspection
        }
        gzipishttp.close();
        directGZIPOutStream.close();

        // Now save the GZIPed file locally
        System.out.println("Testing saving to file before decompression");
        int bytesFromGZIPFile = 0;
        ReadableByteChannel rbc = Channels.newChannel(url.openStream());
        FileOutputStream outputStream = new FileOutputStream("./test.wet.gz");
        outputStream.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
        outputStream.close();

        // SECOND TEST - decompress from FileInputStream
        GZIPInputStream gzipis = new GZIPInputStream(new FileInputStream("./test.wet.gz"));

        buffer = new byte[1024];
        bytesRead = -1;
        while((bytesRead = gzipis.read(buffer, 0, 1024)) != -1) {
            bytesFromGZIPFile += bytesRead;
        }
        gzipis.close();

        // The Results - these numbers should match but they don't
        System.out.println("Read "+bytesFromGZIPDirect+" bytes from HTTP->GZIP");
        System.out.println("Read "+bytesFromGZIPFile+" bytes from HTTP->file->GZIP");
        System.out.println("Output from HTTP->GZIP saved to file "+testGZFileName);
    }

}

编辑

根据@VGR的评论,在演示代码中关闭了流和相关通道。

更新:

问题似乎是文件特定的。我将Common Crawl WET存档文件下载到本地(wget),解压缩(gunzip 1.8),然后重新压缩(gzip 1.8),再上传到S3,然后即可成功进行实时解压缩。如果您修改上面的示例代码以包含以下行,则可以查看测试:

// Original file from CommonCrawl hosted on S3
URL originals3 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz");
// Recompressed file hosted on S3
URL rezippeds3 = new URL("https://s3-us-west-1.amazonaws.com/com.jeffharwell.commoncrawl.gziptestbucket/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz");

test(originals3, "originalhost.txt");
test(rezippeds3, "rezippedhost.txt");

URL rezippeds3指向我下载、解压缩并重新压缩后重新上传到S3的WET存档文件。您将看到以下输出:

Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 7212400 bytes from HTTP->GZIP
Read 448974935 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file originals3.txt
-----
Testing URL https://s3-us-west-1.amazonaws.com/com.jeffharwell.commoncrawl.gziptestbucket/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 448974935 bytes from HTTP->GZIP
Read 448974935 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file rezippeds3.txt

如您所见,一旦文件被重新压缩,我就可以通过GZIPInputStream流式传输它并获取整个文件。原始文件仍然显示解压缩的常见过早结束。当我下载并上传未经重新压缩的WET文件时,我得到了同样不完整的流媒体行为,因此肯定是重新压缩修复了它。我还将原始文件和重新压缩的文件都放在传统的Apache Web服务器上,并能够复制结果,因此S3似乎与问题无关。
那么,我有一个新问题。
新问题
当读取相同内容时,为什么FileInputStream的行为会与HTTPInputStream不同?如果它是完全相同的文件,为什么:
new GZIPInputStream(urlConnection.getInputStream());

new GZIPInputStream(new FileInputStream("./test.wet.gz"));
会有任何不同?难道输入流不只是输入流吗?

关于您的“将GZIP文件保存到本地”的代码:通道需要关闭,就像InputStreams和OutputStreams一样。 - VGR
1
OpenJDK bug JDK-8081450 看起来是同样的问题。 - Jeff Harwell
2个回答

13

根本原因讨论

事实证明,InputStreams 可能会有很大的差异。特别是它们在如何实现 .available() 方法方面存在差异。例如,ByteArrayInputStream .available() 返回 InputStream 中剩余字节数。然而,HTTPInputStream .available() 返回在需要重新填充缓冲区之前可供读取的字节数,以避免阻塞 IO 请求。(有关更多信息,请参见 Java 文档)

问题在于,GZIPInputStream 使用 .available() 的输出来确定在完成解压完整的 GZIP 文件后,InputStream 中是否可能还有其他 GZIP 文件可用。下面是 OpenJDK 源文件 GZIPInputStream.java 中 readTrailer() 方法的第 231 行。

   if (this.in.available() > 0 || n > 26) {

如果HTTPInputStream读取缓冲区在两个连接的GZIP文件边界处为空,GZIPInputStream将调用.available(),它会响应0,因为它需要到网络中重新填充缓冲区,所以GZIPInputStream会过早地关闭文件。
Common Crawl .wet存档是数百兆小型连接的GZIP文件,因此最终HTTPInputStream缓冲区将在其中一个连接的GZIP文件末尾被清空,导致GZIPInputStream过早关闭。这解释了问题演示中的问题。
解决方案和解决方法
GIST包含对jdk8u152-b00修订版12039的补丁和两个jtreg测试,可以消除(依我个人意见)不正确的依赖关系.available()。
如果您无法打补丁JDK,则解决方法是确保available()始终返回> 0,这将强制GZIPInputStream始终检查流中的另一个GZIP文件。不幸的是,HTTPInputStream是私有的,因此您不能直接对其进行子类化,而是要扩展InputStream并包装HTTPInputStream。下面的代码演示了这种解决方法。

演示代码和输出

这里是输出,显示当HTTPInputStream被包装为讨论的GZIPInputStream时,从文件中读取连接的GZIP和直接从HTTP读取将产生相同的结果。

Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 448974935 bytes from HTTP->GZIP
Read 448974935 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file testfile0.wet
------
Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00040-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 451171329 bytes from HTTP->GZIP
Read 451171329 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file testfile40.wet
------
Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698541142.66/wet/CC-MAIN-20161202170901-00500-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 453183600 bytes from HTTP->GZIP
Read 453183600 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file testfile500.wet

这是问题中的演示代码,使用InputStream包装器进行了修改。
import java.net.*;
import java.io.*;
import java.util.zip.GZIPInputStream;
import java.nio.channels.*;

public class GZIPTest {
    // Here is a wrapper class that wraps an InputStream
    // but always returns > 0 when .available() is called.
    // This will cause GZIPInputStream to always make another 
    // call to the InputStream to check for an additional 
    // concatenated GZIP file in the stream.
    public static class AvailableInputStream extends InputStream {
        private InputStream is;

        AvailableInputStream(InputStream inputstream) {
            is = inputstream;
        }

        public int read() throws IOException {
            return(is.read());
        }

        public int read(byte[] b) throws IOException {
            return(is.read(b));
        }

        public int read(byte[] b, int off, int len) throws IOException {
            return(is.read(b, off, len));
        }

        public void close() throws IOException {
            is.close();
        }

        public int available() throws IOException {
            // Always say that we have 1 more byte in the
            // buffer, even when we don't
            int a = is.available();
            if (a == 0) {
                return(1);
            } else {
                return(a);
            }
        }
    }



    public static void main(String[] args) throws Exception {
        // Our three test files from CommonCrawl
        URL url0 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz");
        URL url40 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00040-ip-10-31-129-80.ec2.internal.warc.wet.gz");
        URL url500 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698541142.66/wet/CC-MAIN-20161202170901-00500-ip-10-31-129-80.ec2.internal.warc.wet.gz");

        /*
         * Test the URLs and display the results
         */
        test(url0, "testfile0.wet");
        System.out.println("------");
        test(url40, "testfile40.wet");
        System.out.println("------");
        test(url500, "testfile500.wet");
    }

    public static void test(URL url, String testGZFileName) throws Exception {
        System.out.println("Testing URL "+url.toString());

        // First directly wrap the HTTP inputStream with GZIPInputStream
        // and count the number of bytes we read
        // Go ahead and save the extracted stream to a file for further inspection
        System.out.println("Testing HTTP Input Stream direct to GZIPInputStream");
        int bytesFromGZIPDirect = 0;
        URLConnection urlConnection = url.openConnection();
        // Wrap the HTTPInputStream in our AvailableHttpInputStream
        AvailableInputStream ais = new AvailableInputStream(urlConnection.getInputStream());
        GZIPInputStream gzipishttp = new GZIPInputStream(ais);
        FileOutputStream directGZIPOutStream = new FileOutputStream("./"+testGZFileName);
        int buffersize = 1024;
        byte[] buffer = new byte[buffersize];
        int bytesRead = -1;
        while ((bytesRead = gzipishttp.read(buffer, 0, buffersize)) != -1) {
            bytesFromGZIPDirect += bytesRead;
            directGZIPOutStream.write(buffer, 0, bytesRead); // save to file for further inspection
        }
        gzipishttp.close();
        directGZIPOutStream.close();

        // Save the GZIPed file locally
        System.out.println("Testing saving to file before decompression");
        ReadableByteChannel rbc = Channels.newChannel(url.openStream());
        FileOutputStream outputStream = new FileOutputStream("./test.wet.gz");
        outputStream.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);

        // Now decompress the local file and count the number of bytes
        int bytesFromGZIPFile = 0;
        GZIPInputStream gzipis = new GZIPInputStream(new FileInputStream("./test.wet.gz"));

        buffer = new byte[1024];
        while((bytesRead = gzipis.read(buffer, 0, 1024)) != -1) {
            bytesFromGZIPFile += bytesRead;
        }
        gzipis.close();

        // The Results
        System.out.println("Read "+bytesFromGZIPDirect+" bytes from HTTP->GZIP");
        System.out.println("Read "+bytesFromGZIPFile+" bytes from HTTP->file->GZIP");
        System.out.println("Output from HTTP->GZIP saved to file "+testGZFileName);
    }

}

2
我已经提交了我的补丁给OpenJDK core-libs-dev组以供考虑。如果您想知道是否有任何更新,请检查错误[JDK-8081450](https://bugs.openjdk.java.net/browse/JDK-8081450)。 - Jeff Harwell
1
谢谢您的解释。我以为自己疯了,因为从本地文件或HDFS读取.warc.gz文件可以正常工作,但是通过S3客户端库读取时总是在几百条记录处停止。 - Jared

0
我们最近在尝试从AWS S3流式解码一个大的gzip文件时,遇到了相同的bug。我们用Apache Common的 GzipCompressorInputStream替换了原生的GzipInputStream以解决这个问题。确保在构造函数中设置decompressConcatenated=true,GzipCompressorInputStream就能够解码S3 HTTPInputStream而不会出现过早终止的情况。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接