如何解压缩Flux<DataBuffer>(以及如何编写一个)?

8
我有一个要求,需要读写压缩的(GZIP)流,但不需要中间存储。目前,我使用Spring的RestTemplate进行写入操作,使用Apache HTTP客户端进行读取操作(有关为什么不能使用RestTemplate读取大型流的说明,请参见此处)。实现相当简单,我将GZIPInputStream放在响应InputStream上并继续处理。
现在,我想切换到使用Spring 5 WebClient (只是因为我不喜欢现状)。然而,WebClient的反应性质是与Flux<Stuff>打交道;我相信它可以获得Flux<DataBuffer>,其中DataBuffer是对ByteBuffer的抽象。问题是,在不必将完整流存储在内存中(OutOfMemoryError, 我看着你),或写入本地磁盘情况下如何即时解压缩?值得一提的是,WebClient在内部使用Netty。
另请参见Reactor Nettyissue-251
还与Spring集成相关的问题issue-2300
我承认对(解)压缩不太熟悉,但是我做了调查,但在网上找到的所有材料似乎都没有什么帮助。
在Java NIO直接缓冲区上进行压缩compression on java nio direct buffers
使用NIO编写GZIP文件Writing GZIP file with nio
从FileChannel中读取GZIP文件(Java NIO)Reading a GZIP file from a FileChannel (Java NIO)
使用NIO进行(解)压缩文件(de)compressing files using NIO
Java中可迭代gzip解压缩Iterable gzip deflate/inflate in Java

仅使用数据缓冲区上的 asInputStream() 方法并将其放入 GZIPInputStream 中是否无法正常工作? - vandale
1
@vandale,我不明白,每个“DataBuffer”只是一个部分,对吧?“GZIPInputStream”应该在完整的“InputStream”上操作,而不是一堆它。有一种方法可以使用“DataBuffer.asInputStream”和“SequenceInputStream”重建“InputStream”,但这将违背使用反应式NIO的目的。 - Abhijit Sarkar
2个回答

4
public class HttpResponseHeadersHandler extends ChannelInboundHandlerAdapter {
    private final HttpHeaders httpHeaders;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpResponse &&
                !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
            HttpHeaders headers = ((HttpResponse) msg).headers();

            httpHeaders.forEach(e -> {
                log.warn("Modifying {} from: {} to: {}.", e.getKey(), headers.get(e.getKey()), e.getValue());
                headers.set(e.getKey(), e.getValue());
            });
        }
        ctx.fireChannelRead(msg);
    }
}

然后我创建一个ClientHttpConnector,用于与WebClient一起使用,并在afterNettyContextInit中添加处理程序:

ctx.addHandlerLast(new ReadTimeoutHandler(readTimeoutMillis, TimeUnit.MILLISECONDS));
ctx.addHandlerLast(new Slf4JLoggingHandler());
if (forceDecompression) {
    io.netty.handler.codec.http.HttpHeaders httpHeaders = new ReadOnlyHttpHeaders(
            true,
            CONTENT_ENCODING, GZIP,
            CONTENT_TYPE, APPLICATION_JSON
    );
    HttpResponseHeadersHandler headersModifier = new HttpResponseHeadersHandler(httpHeaders);
    ctx.addHandlerFirst(headersModifier);
}
ctx.addHandlerLast(new HttpContentDecompressor());

当然,对于没有GZIP压缩的响应,这种方法会失败,因此我仅在特定情况下使用此实例的WebClient,在这种情况下,我确定响应是被压缩的。
写作很容易:Spring有一个ResourceEncoder,因此InputStream可以简单地转换为InputStreamResource,就完成了!

3
创建 Spring 的 WebClient 时,我在哪里可以找到 afterNettyContextInit 方法? - membersound

2

在此提醒一下,由于5.1版本的变化,API有所改变,这让我有点困惑。

我有一个与接受答案中ChannelInboundHandler类似的设置:

public class GzipJsonHeadersHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpResponse
                && !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
            HttpHeaders headers = ((HttpResponse) msg).headers();
            headers.clear();
            headers.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
            headers.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
        }
        ctx.fireChannelRead(msg);
    }
}

为简单起见,我需要的标题值已经硬编码在那里,否则它是相同的。

然而,注册是不同的:

WebClient.builder()
    .clientConnector(
            new ReactorClientHttpConnector(
                    HttpClient.from(
                            TcpClient.create()
                                    .doOnConnected(c -> {
                                        c.addHandlerFirst(new HttpContentDecompressor());
                                        c.addHandlerFirst(new HttpResponseHeadersHandler());
                                    })
                    ).compress(true)
            )
    )
    .build();

看起来Netty现在维护了一个与系统列表分开的用户处理程序列表,并且addHandlerFirst()只会将您的处理程序放在用户列表的前面。因此,需要显式调用HttpContentDecompressor以确保它在插入正确标头的处理程序之后执行。


我没有看出区别。在我的答案中,是使用addHandlerLast调用添加了HttpContentDecompressor - Abhijit Sarkar
@AbhijitSarkar 哦,你说得对。唯一的区别在于它在那种情况下添加的位置 - 就像 membersound 一样,我很难看到应该把它放在哪里,然后发现你在最近的版本中使用了上面的代码片段而不是 afterNettyContextInit()。如果你想把那个代码片段加入到你的答案中,我会删除这个(我对声望不感兴趣,只是想让完整的代码示例出现在这里)。 - Michael Berry

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接