如何将ByteBuffer的Flux转换为Spring的BodyInserter

3

我有一个使用场景,需要从s3中读取文件,并在Java中发布到REST服务。
为了实现这个目标,我正在尝试awssdk s3 API来读取文件,它将返回Flux<ByteBuffer>。然后,我会使用Spring的WebClient发布到REST服务。

经过我的探索,Spring的WebClient需要BodyInserter,可以使用BodyInserters.fromDataBuffers来准备。但我无法弄清楚如何正确地将Flux转换为Flux并调用WebClient exchange方法。

Flux<ByteBuffer> byteBufferFlux = getS3File(key);
        Flux<DataBuffer> dataBufferFlux= byteBufferFlux.map(byteBuffer -> {
            ?????????????Convert bytebuffer to data buffer ??????
            return dataBuffer;
        });

        BodyInserter<Flux<DataBuffer>, ReactiveHttpOutputMessage> inserter = BodyInserters.fromDataBuffers(dataBufferFlux);

有建议可以实现这个吗?
2个回答

3
你可以使用DefaultDataBuffer进行转换,你可以通过DefaultDataBufferFactory创建它。
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();

Flux<DataBuffer> buffer = getS3File(key).map(dataBufferFactory::wrap);

BodyInserter<Flux<DataBuffer>, ReactiveHttpOutputMessage> inserter =
    BodyInserters.fromDataBuffers(buffer);

实际上,如果使用Webclient,您根本不需要BodyInserter,您可以使用以下body()方法签名。

<T, P extends Publisher<T>> RequestHeadersSpec<?> body(P publisher, Class<T> elementClass);

您可以直接将Flux<ByteBuffer>通过指定要使用的类名传递到其中。

    WebClient.create("http://someUrl")
            .post()
            .uri("/someUri")
            .body(getS3File(key),ByteBuffer.class)

1
我很惊讶地发现谷歌上找到答案是多么困难。遇到了同样的问题,你的解决方案帮了我很大忙,谢谢! - Igor Petrov

1

您可能不需要使用dataBufferFlux,可以直接将Flux写入REST端点。请尝试以下方法:

  Flux<ByteBuffer> byteBufferFlux = getS3File(key);

  BodyInserter<Flux<ByteBuffer>, ReactiveHttpOutputMessage> = BodyInserters.fromPublisher(byteBufferFlux, ByteBuffer.class);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接