将写入OutputStream的内容转换为Flux<DataBuffer>,以便ServerResponse使用

5

我有一个遗留库需要用来检索文件。这个遗留库不像通常所期望的读取数据时返回InputStream,而是需要传递一个打开的OutputStream,并将数据写入其中。

我需要编写一个Webflux REST服务,将这个OutputStream写入org.springframework.web.reactive.function.server.ServerResponse的正文中。

legacyLib.BlobRead(outputStream); // writes the stream to an outputstream, that has to be provided by me, and somehow has to end up in the ServerResponse

我想直接将数据流传递给ServerResponse,那么我需要像这样做一些事情,对吗?

ServerResponse.ok().body(magicOutpuStreamToFluxConverter(), DataBuffer.class);

你能否修改一下你的问题,让这段代码片段更加清晰明了?没有类型的伪代码会让人难以理解哪部分是从库中获取的实际类型,哪部分是你想要看到的解决方案的伪代码。 - Brian Clozel
1个回答

4

这是RequestHandler中重要的部分。我省略了一些可能通常不需要的错误处理/异常捕获。请注意,我为读取操作发布了一个不同的Scheduler(或者至少我想这样做),以便这个阻塞式读取不会干扰我的主事件线程:

    private Mono<ServerResponse> writeToServerResponse(@NotNull FPTag tag) {
        final long blobSize = tag.getBlobSize();
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_OCTET_STREAM)
            .body(Flux.<DataBuffer>create((FluxSink<DataBuffer> emitter) -> {
          // for a really big blob I want to read it in chunks, so that my server doesn't use too much memory
          for(int i = 0; i < blobSize; i+= tagChunkSize) {
            // new DataBuffer that is written to, then emitted later
            DefaultDataBuffer dataBuffer = new DefaultDataBufferFactory().allocateBuffer();
            try (OutputStream outputStream = dataBuffer.asOutputStream()) {
              // write to the outputstream of DataBuffer
              tag.BlobReadPartial(outputStream, i, tagChunkSize, FPLibraryConstants.FP_OPTION_DEFAULT_OPTIONS);
              // don't know if flushing is strictly neccessary
              outputStream.flush();
            } catch (IOException | FPLibraryException e) {
              log.error("Error reading + writing from tag to http outputstream", e);
              emitter.error(e);
            }
            emitter.next(dataBuffer);
          }
          // if blob is finished, send "complete" to my flux of DataBuffers
          emitter.complete();
        }, FluxSink.OverflowStrategy.BUFFER).publishOn(Schedulers.newElastic("centera")).doOnComplete(() -> closeQuietly(tag)), DataBuffer.class);
  
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接