如何发出一个真实的Flux请求?

6
我将为您翻译以下内容,这是一篇关于IT技术的文章:

我开始学习Spring-boot中的Webflux。我了解到,对于RestController的端点,您可以定义一个Flux请求体,其中我期望有一个真正的流式Flux,也就是说,整个请求的各个部分会依次出现,并且这些部分也可以逐个进行处理。但是,在使用客户端和服务器构建一个小示例之后,我无法按预期工作。

下面是服务器的代码片段:

@PostMapping("/digest")
    public Flux<String> digest(@RequestBody Flux<String> text) {
        continuousMD5.reset();
        return text.log("server.request.").map(piece -> continuousMD5.update(piece)).log("server.response.");
    }

注意:每段文本都将被发送到一个continuousMD5对象,该对象将累积所有片段并计算并返回每次累积后的中间MD5哈希值。在MD5计算之前和之后,将记录流。

下面是客户端的代码片段:

@PostConstruct
    private void init() {
        webClient = webClientBuilder.baseUrl(reactiveServerUrl).build();
    }

@PostMapping(value = "/send", consumes = MediaType.TEXT_PLAIN_VALUE)
    public Flux<String> send(@RequestBody Flux<String> text) {
        return webClient.post()
            .uri("/digest")
            .accept(MediaType.TEXT_PLAIN)
            .body(text.log("client.request."), String.class)
            .retrieve().bodyToFlux(String.class).log("client.response.");
    }

注意:客户端接受一段文本的流并记录该流,并将其作为流发送到服务器。

令人惊讶的是,我通过以下命令行使REST请求成功,并使客户端接收到一个文本流:

for i in $(seq 1 100); do echo "The message $i"; done | http POST :8080/send  Content-Type:text/plain

我可以在客户端的日志中看到:

2019-05-09 17:02:08.604  INFO 3462 --- [ctor-http-nio-2] client.response.Flux.MonoFlatMapMany.2   : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2019-05-09 17:02:08.606  INFO 3462 --- [ctor-http-nio-2] client.response.Flux.MonoFlatMapMany.2   : request(1)
2019-05-09 17:02:08.649  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
2019-05-09 17:02:08.650  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(32)
2019-05-09 17:02:08.674  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onNext(The message 1)
2019-05-09 17:02:08.676  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.676  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onNext(The message 2)
...
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onNext(The message 100)
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.711  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onComplete()
2019-05-09 17:02:08.711  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.711  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.860  INFO 3462 --- [ctor-http-nio-6] client.response.Flux.MonoFlatMapMany.2   : onNext(CSubeSX3yIVP2CD6FRlojg==)
2019-05-09 17:02:08.862  INFO 3462 --- [ctor-http-nio-6] client.response.Flux.MonoFlatMapMany.2   : onComplete()
^C2019-05-09 17:02:47.393  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : cancel()

每个文本片段都被识别为流数据的一个元素并分别请求。

但在服务器日志中:

2019-05-09 17:02:08.811  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
2019-05-09 17:02:08.813  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : onSubscribe(FluxMap.MapSubscriber)
2019-05-09 17:02:08.814  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : request(1)
2019-05-09 17:02:08.814  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.838  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : onNext(The message 1The message 2The message 3The message 4The message 5The message 6The message 7The message 8The message 9The message 10The message 11The message 12The message 13The message 14The message 15The message 16The message 17The message 18The message 19The message 20The message 21The message 22The message 23The message 24The message 25The message 26The message 27The message 28The message 29The message 30The message 31The message 32The message 33The message 34The message 35The message 36The message 37The message 38The message 39The message 40The message 41The message 42The message 43The message 44The message 45The message 46The message 47The message 48The message 49The message 50The message 51The message 52The message 53The message 54The message 55The message 56The message 57The message 58The message 59The message 60The message 61The message 62The message 63The message 64The message 65The message 66The message 67The message 68The message 69The message 70The message 71The message 72The message 73The message 74The message 75The message 76The message 77The message 78The message 79The message 80The message 81The message 82The message 83The message 84The message 85The message 86The message 87The message 88The message 89The message 90The message 91The message 92The message 93The message 94The message 95The message 96The message 97The message 98The message 99The message 100)
2019-05-09 17:02:08.840  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : onNext(CSubeSX3yIVP2CD6FRlojg==)
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : request(32)
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : request(32)
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : onComplete()
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : onComplete()
2019-05-09 17:02:47.394  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : cancel()
2019-05-09 17:02:47.394  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : cancel()

我看到所有的文本片段同时到达服务器,因此被处理为通量流中的一个大元素(可以验证只计算了一个MD5哈希值而不是100个)。
我期望服务器也能接收来自客户端的文本片段作为通量流中的元素,否则对于服务器来说,它不是真正的反应式请求,而只是一个普通的阻塞请求。
请问有人可以帮助我理解如何使用Webflux进行真正的通量流响应请求吗?谢谢!
更新:
我使用类似的命令行对服务器进行REST请求,可以看到服务器将文本片段(“The message x”)作为通量流接收。因此,我想服务器没问题了,现在的问题可能是客户端:我该如何使用WebClient进行真正的通量流REST请求呢?

响应式模型仍然映射到HTTP,我很难理解你所说的“pieces”的含义:HTTP在离散请求中运行,这定义了您事件的边界。 - chrylis -cautiouslyoptimistic-
当我发送了带有“消息x”的REST请求,从1到100时,客户端逐个接收这些消息,或者如果我猜测,Webflux框架将整个主体拆分为流通量的元素。我期望服务器也会做同样的事情,因为客户端也发送了一个流通量。但是我看到服务器将流通量的所有元素作为一个大块接收,并没有“拆分”流通量中的元素。我希望这样解释更好理解。 - edwardcarlfox
2个回答

2

感谢指出这些要点!我甚至没有考虑到 MIME 类型可能会产生影响。然而,如果我阅读 Webflux 的文档,在使用 Flux 作为请求体时,并没有明确要求 MIME 类型或 WebSocket 协议。因此,我认为 Webflux 应该足够智能,可以检测传入的请求并尝试将其解码为 Flux。 - edwardcarlfox
是的,从文档中并不是很清楚。但大部分混淆都来自于HTTP和响应式模型之间的差异 - 想象一下如果你在application/json内容类型的响应中返回一个无限的Flux会发生什么。总的来说,始终验证您的期望非常重要,最好使用自动化测试,因为库的默认行为甚至可以在主要版本之间更改。 - Ilya Zinkovich

0

经过尝试和阅读更多文档,我终于弄清楚了如何使我的示例工作:

对于客户端,我需要确保发送到服务器的请求正文也通过换行符进行分隔:

@PostMapping(value = "/send", consumes = MediaType.TEXT_PLAIN_VALUE)
    public Flux<String> send(@RequestBody Flux<String> text) {
        return webClient.post()
            .uri("/digest")
            .accept(MediaType.TEXT_PLAIN)
            .body(
                text
                    .onBackpressureBuffer()
                    .log("client.request.")
                    .map(piece -> piece + "\n"),
                String.class)
            .retrieve().bodyToFlux(String.class)
            .onBackpressureBuffer()
            .log("client.response.");
    }

这个操作可以达到与通过命令行进行REST请求相同的效果,例如for i in $(seq 1 100); do echo "The message $i"; done会将"The message x"分别输出到每一行。

同样地,对于服务器来说,响应体也需要使用换行符进行分隔,以便客户端可以将响应体解码为流:

@PostMapping("/digest")
    public Flux<String> digest(@RequestBody Flux<String> text) {
        continuousMD5.reset();
        return text
            .log("server.request.")
            .map(piece -> continuousMD5.update(piece))
            .map(piece -> piece + "\n")
            .log("server.response.");
    }

我在客户端发送和接收之前添加了onBackpressureBuffer(),以便在发送大量消息时不会出现溢出异常。

然而,尽管上述代码“有效”,但它并没有真正实现流式传输,因为我可以在日志中看到,服务器在客户端发送整个请求体后才开始接收请求体,而客户端在服务器发送整个响应体后才开始接收响应体。也许像Ilya Zinkovich提到的那样,使用WebSocket协议可以实现真正的流式传输效果,但我还没有尝试过。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接