如何将 Mono<List<T>> 转换为 Stream<T>?

3

我有一段代码,使用WebClient从Json数组结果创建一个Mono<List<T>>。bodyToMono方法返回一个Mono<List<T>对象,我订阅它并获取一个parallelStream

    final WebClient client = WebClient.create(daemonEndpoint);
    client.get()
        .uri("/services?label=com.docker.stack.namespace")
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
        })
        .subscribe(services -> services.parallelStream()
            .map(e -> {
                final String id = (String) e.get("ID");

我想知道是否有一种解决方案可以删除订阅部分。
1个回答

1

根据我使用反应器的经验,你不能在不阻塞调用的情况下将你的Mono转换为Stream,可以按照以下方式完成:

Stream<T> stream = yourMono<T>.map(it -> it.parallelStream()).block()

另一种方法是以反应式的方式处理它(注意,无论如何都必须有人订阅您的发布者,它不能自己完成)

yourMono<T>.flatMapMany(Flux::fromIterable)
           .flatMap(it -> {
              //there goes your <Map<String, Object>>
           });

flatMap 造成了问题,因为它必须为我创建一个发布者。我将其更改为 map,另一个监听器停止工作,所以也许这就是你选择 flatMap 的原因。 - Archimedes Trajano

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接