Spring WebFlux的Flux<DataBuffer>如何转换为InputStream

7
我目前正在使用Spring WebFlux工作。 我正在尝试使用Spring WebFlux上传大文件(70mo)。
我的控制器
@RequestMapping(method = RequestMethod.POST, consumes = MediaType.MULTIPART_FORM_DATA_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<String> uploadHandler(@RequestBody Flux<Part> fluxParts, @RequestParam(value = "categoryType") String categoryType, @PathVariable(value = "traceabilityReportUuid") String traceabilityUuid) {
    return documentHandler.upload(fluxParts, UUID.fromString(traceabilityUuid), categoryType);
}

我的服务

public Flux<String> upload(Flux<Part> fluxParts, UUID traceabilityUuid, String categoryType) {

    return fluxParts
            .filter(part -> part instanceof FilePart)
            .ofType(FilePart.class)
            .flatMap(p -> this.upload(p, traceabilityUuid, categoryType));


}


private Mono<String> upload(FilePart filePart, UUID traceabilityUuid, String categoryType) {

    return filePart.content().collect(InputStreamCollector::new, (t, dataBuffer) -> t.collectInputStream(dataBuffer.asInputStream()))
            .flatMap(inputStreamCollector -> {
                upload(traceabilityUuid, inputStreamCollector.getInputStream(), filePart.filename(), categoryType);

                return Mono.just("OK");
            });
}

我的收藏家
public class InputStreamCollector {

    private InputStream is;

    public void collectInputStream(InputStream is) {
        if (this.is == null) this.is = is;
        this.is = new SequenceInputStream(this.is, is);
    }

    public InputStream getInputStream() {
        return this.is;
    }
}

最后,我通过以下方式检索完整的输入流: inputStreamCollector.getInputStream() 并将其传递给我的对象。
然后,我使用此对象发送到S3存储桶。
但在发送到S3之前,我必须将其转换为文件(使用Apache工具),但我遇到了一个stackoverflow异常。
java.lang.StackOverflowError: null
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)

它对小文件(7兆字节)工作正常。
请问您有解决我的问题的想法吗?
4个回答

6

4
最终,这并不是一个好的解决方案 :)最好的解决方案是http://blog.davidvassallo.me/2018/07/09/reactive-spring-webflux-multipart-file-upload/。 - ek1984
你最终使用了Multipart / File,而不是缓冲区吗? - Fred
1
注意 Spring 同步解析请求体中的 FilePart。当上传大文件作为 1 multipart/file 时,首先将数据传输到临时文件: System.getProperty("java.io.tmpdir") + "/nio-file-upload/nio-body-<partIndex>-<UUID>.tmp"。当上传完成时,Flux<Part> 开始发出事件。 我认为 Flux<Part> 在上传多个小文件时非常有用。 - Andrii Ofim

2

要将 DataBuffer 转换为 StringList,您可以使用 Apache IOUtils。在此示例中,我返回了一个 Flux,并为避免 try/catch 问题,我使用 Mono.fromCallable 进行了包装。

protected Flux<String> getLines(final DataBuffer dataBuffer) {
    return Mono.fromCallable(() -> IOUtils.readLines(dataBuffer.asInputStream(), Charsets.UTF_8))
        .flatMapMany(Flux::fromIterable);
}

4
在非阻塞框架中使用阻塞API并不理想。 - Dormouse

1

假设有一个Flux<DataBuffer> fluxPublisher

final var publisher = fluxPublisher
  .map(dataBuffer -> dataBuffer.asInputStream(true))
  .reduce(SequenceInputStream::new);

我会给你一个Mono<InputStream>

处理完毕后,必须关闭结果,否则会泄漏。以下是使用Reader进行解析并关闭的示例:

publisher
  .flatMap(is-> {
    try (var reader = new InputStreamReader(is)) {
      // parse
    } catch (IOException e) {
      return Mono.error(e);
    }
  });

1

这个例子将帮助您了解如何从FilePart加载数据:

public static Mono<String> readBase64Content(FilePart filePart) {
    return filePart.content().flatMap(dataBuffer -> {
         byte[] bytes = new byte[dataBuffer.readableByteCount()];
         dataBuffer.read(bytes);
         String content = Base64.getEncoder().encodeToString(bytes);
         return Mono.just(content);
    }).last();
}

Rest方法

@PostMapping(value = "/person/{personId}/photo", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
Mono<String> uploadPhoto(@PathVariable Long personId, @RequestPart("photo") Mono<FilePart> photo) {
        return photo.ofType(FilePart.class).flatMap(StringUtil::readBase64Content);
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接