如何将List<Mono<T>>转换为Mono<List<T>>?

18

我有一个返回 Mono<Output> 的方法:

interface Processor {
  Mono<Output> process(Input input);
}

我想为一个集合执行这个processor方法:

List<Input> inputs = // get inputs
Processor processor = // get processor
List<Mono<Output>> outputs = inputs.stream().map(supplier::supply).collect(toList());

但是我想得到的不是List<Mono<Output>>,而是包含聚合结果的Mono<List<Output>>

我尝试使用reduce,但最终结果看起来非常笨拙:

Mono<List<Output>> result = inputs.stream().map(processor::process)
    .reduce(Mono.just(new ArrayList<>()),
        (monoListOfOutput, monoOfOutput) ->
            monoListOfOutput.flatMap(list -> monoOfOutput.map(output -> {
              list.add(output);
              return list;
            })),
        (left, right) ->
            left.flatMap(leftList -> right.map(rightList -> {
              leftList.addAll(rightList);
              return leftList;
            })));

我能用更少的代码实现这个吗?


1
你可以使用Mono.just将“collect(toList)”的调用包装起来,并使用map(Mono::block())。 - manf
@manf 那我会得到 Mono<List<Mono<Output>>>。但我只需要 Mono<List<Output>>。这是我已经尝试过的方法。 - Ilya Zinkovich
1
@manf 我不想阻止。 - Ilya Zinkovich
2个回答

32

如果没有任何原因需要创建流,您可以从输入创建Flux,映射它并收集列表

Flux.fromIterable(inputs).flatMap(processor::process).collectList();

即使您以“outputs”而不是“inputs”开始,也可以使用相同的方法(编辑了我的答案)。但这样做会更好。 - Alexey Romanov

11
// first merge all the `Mono`s:
List<Mono<Output>> outputs = ...
Flux<Output> merged = Flux.empty();
for (Mono<Output> out : outputs) {
    merged = merged.mergeWith(out);
}

// then collect them
return merged.collectList();

或者(受到亚历山大回答的启发)

Flux.fromIterable(outputs).flatMap(x -> x).collectList();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接