在Spring Webflux中,如何从`OutputStream`转换为`Flux<DataBuffer>`?

4
我正在动态地构建一个tarball,并希望直接返回流,这应该是使用.tar.gz格式做到100%可能的。
下面的代码是我在大量搜索后得到的最接近数据缓冲区的东西。基本上,我需要实现一个OutputStream并提供或发布一个Flux ,以便我可以从我的方法中返回它,并具有流式输出,而不是将整个tarball缓冲到RAM中(我几乎确定这就是在这里发生的)。 我使用了apache Compress-commons,它有一个精美的API,但其全部基于OutputStream。
我想另一种方法是直接写入响应,但我认为那不会适当地响应?不确定如何从某种Response对象中获取OutputStream。
顺便说一句,这是kotlin,在Spring Boot 2.0上实现的。
@GetMapping("/cookbook.tar.gz", "/cookbook")
fun getCookbook(): Mono<DefaultDataBuffer> {
    log.info("Creating tarball of cookbooks: ${soloConfig.cookbookPaths}")

    val transformation = Mono.just(soloConfig.cookbookPaths.stream()
            .toList()
            .flatMap {
                Files.walk(Paths.get(it)).map(Path::toFile).toList()
            })
            .map { files ->

                //Will make one giant databuffer... but oh well? TODO: maybe use some kind of chunking.
                val buffer = DefaultDataBufferFactory().allocateBuffer()
                val outputBufferStream = buffer.asOutputStream()


                //Transform my list of stuff into an archiveOutputStream
                TarArchiveOutputStream(GzipCompressorOutputStream(outputBufferStream)).use { taos ->
                    taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU)

                    log.info("files to compress: ${files}")

                    for (file in files) {
                        if (file.isFile) {
                            val entry = "cookbooks/" + file.name
                            log.info("Adding ${entry} to tarball")
                            taos.putArchiveEntry(TarArchiveEntry(file, entry))
                            FileInputStream(file).use { fis ->
                                fis.copyTo(taos) //Copy that stuff!
                            }
                            taos.closeArchiveEntry()
                        }
                    }
                }
                buffer
            }

    return transformation
}
1个回答

1
我曾经苦恼过这个问题,现在有了一个有效的解决方案。你可以实现一个OutputStream并将这些字节发布到一个流中。一定要重写close方法,并发送onComplete。效果非常好!
@RestController
class SoloController(
        val soloConfig: SoloConfig
) {
    val log = KotlinLogging.logger { }

    @GetMapping("/cookbooks.tar.gz", "/cookbooks")
    fun streamCookbook(serverHttpResponse: ServerHttpResponse): Flux<DataBuffer> {
        log.info("Creating tarball of cookbooks: ${soloConfig.cookbookPaths}")

        val publishingOutputStream = PublishingOutputStream(serverHttpResponse.bufferFactory())

        //Needs to set up cookbook path as a parent directory, and then do `cookbooks/$cookbook_path/<all files>` for each cookbook path given
        Flux.just(soloConfig.cookbookPaths.stream().toList())
                .doOnNext { paths ->
                    //Transform my list of stuff into an archiveOutputStream
                    TarArchiveOutputStream(GzipCompressorOutputStream(publishingOutputStream)).use { taos ->
                        taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU)

                        paths.forEach { cookbookDir ->
                            if (Paths.get(cookbookDir).toFile().isDirectory) {

                                val cookbookDirFile = Paths.get(cookbookDir).toFile()
                                val directoryName = cookbookDirFile.name
                                val entryStart = "cookbooks/${directoryName}"

                                val files = Files.walk(cookbookDirFile.toPath()).map(Path::toFile).toList()

                                log.info("${files.size} files to compress")

                                for (file in files) {
                                    if (file.isFile) {
                                        val relativePath = file.toRelativeString(cookbookDirFile)
                                        val entry = "$entryStart/$relativePath"
                                        taos.putArchiveEntry(TarArchiveEntry(file, entry))
                                        FileInputStream(file).use { fis ->
                                            fis.copyTo(taos) //Copy that stuff!
                                        }
                                        taos.closeArchiveEntry()
                                    }
                                }
                            }
                        }
                    }
                }
                .subscribeOn(Schedulers.parallel())
                .doOnComplete {
                    publishingOutputStream.close()
                }
                .subscribe()

        return publishingOutputStream.publisher
    }

    class PublishingOutputStream(bufferFactory: DataBufferFactory) : OutputStream() {

        val publisher: UnicastProcessor<DataBuffer> = UnicastProcessor.create(Queues.unbounded<DataBuffer>().get())
        private val bufferPublisher: UnicastProcessor<Byte> = UnicastProcessor.create(Queues.unbounded<Byte>().get())

        init {
            bufferPublisher
                    .bufferTimeout(4096, Duration.ofMillis(100))
                    .doOnNext { intList ->
                        val buffer = bufferFactory.allocateBuffer(intList.size)
                        buffer.write(intList.toByteArray())
                        publisher.onNext(buffer)
                    }
                    .doOnComplete {
                        publisher.onComplete()
                    }
                    .subscribeOn(Schedulers.newSingle("publisherThread"))
                    .subscribe()
        }

        override fun write(b: Int) {
            bufferPublisher.onNext(b.toByte())
        }

        override fun close() {
            bufferPublisher.onComplete() //which should trigger the clean up of the whole thing
        }
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接