如何将Mono的流转换为Flux?

6
我有一个方法,尝试使用WebClient返回Mono。
    @GetMapping("getMatch")
    public Mono<Object> getMatch(@RequestParam Long matchId) {
        return WebClient.create(OpenDotaConstant.BASE_URL).get()
                .uri("/matches/{matchId}", matchId)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(Object.class);

    }

它可以返回我期望的结果,接着我尝试创建另一个支持参数为List的方法。
    @GetMapping("getMatches")
    public Flux<Object> getMatches(@RequestParam String matchesId) {
        List<Long> matchesList = JSON.parseArray(matchesId, Long.class);

        return Flux.fromStream(matchesList.parallelStream().map(this::getMatch));
    }

但是这次返回了一个奇怪的结果。
[
    {
        "scanAvailable": true
    },
    {
        "scanAvailable": true
    }
]

我刚接触反应式编程,请问如何正确地结合Stream和Mono,然后将它们转换为Flux?


你好。能否详细说明一下你想要实现什么?你想重用 getMatch 方法并返回一个列表吗?你所发布的响应看起来合理。你还期望得到其他什么吗? - MuratOzkan
@MuratOzkan 是的,重复使用 getMatch 方法是我的目的,我已经得到了答案,不管怎样还是谢谢。 - Rephilo
1个回答

9

可能,你需要的是以下内容:

@GetMapping("getMatches")
public Flux<Object> getMatches(@RequestParam String matchesId) {
    List<Long> matchesList = JSON.parseArray(matchesId, Long.class);
    return Flux.fromStream(matchesList.stream())
               .flatMap(this::getMatch);
}

改为:

@GetMapping("getMatches")
public Flux<Object> getMatches(@RequestParam String matchesId) {
    List<Long> matchesList = JSON.parseArray(matchesId, Long.class);
    return Flux.fromStream(matchesList.parallelStream().map(this::getMatch));
}

注意:

  • 基本上,您期望getMatches端点返回Flux<Object>。然而,它实际上返回的是Flux<Mono<Object>>,因此您会看到奇怪的输出。为了获得Flux<Object>,我建议首先创建匹配 ID 的Flux<Long>,然后flatMap调用getMatch的结果(返回Mono<Object>),这最终会给出Flux<Object>

  • 此外,无需使用parallelStream()。因为您已经在使用反应器,一切都将在反应器调度程序上并行执行。


非常有帮助,似乎我误解了 Reactor 的含义。我会尝试寻找一些关于 Spring Flux 的博客或文档来继续学习。谢谢~ - Rephilo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接