如何将CompletableFuture <Stream <T>> 转换为Stream <CompletableFuture <T>>?

4
那就是这样了。实在找不到任何相似的东西。我想要实现以下任务链。
List<Item> items = Collections.singletonList(new Item("John Smith", "1997-2014"));

Stream<CompletableFuture<List<ScrappingResult>>> scrappingFutures =
    items.stream().map(item ->
        CompletableFuture.supplyAsync(() -> new ScrappingTask(item).call()));

Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
    scrappingFutures.map(resultsFuture -> ???);

Stream<CompletableFuture<TrimmingResult>> trimmingResults = scrappingFuturesUnwrapped.map(resultFuture ->
    // thenCompose?
    resultFuture.thenCompose(result -> {
        Path clipsDir = Paths.get("./"
            + result.getItem().getName()
            + "/" + result.getItem().getTimespan());

        AtomicInteger clipIdx = new AtomicInteger();

        return result.getVideo().getClips().stream()
            .map(clip -> CompletableFuture.supplyAsync(() ->
                new TrimmingTask(
                    ffmpegPath,
                    result.getVideo().getVideoUrl(),
                    clip,
                    clipsDir.resolve("clip_" + clipIdx.incrementAndGet() + ".mp3")).call())
            );
    });
);

最后一行语法不正确,但我希望能传达这个想法。所以,我想执行类似于flatMap两次,最终得到Stream<CompletableFuture<TrimmingResult>>

我该怎么做呢?

谢谢。


1
使用实际的代码会使问题更清晰。 - assylias
@assylias 添加了代码。 - Valya
1
您的代码与您在问题标题中编写的不匹配。您的代码中没有CompletableFuture<Stream<T>>. - Holger
1
除了标题不匹配之外,为什么你要坚持通过进行两个这些转换来使你的代码变得复杂呢? - Holger
3个回答

3
如果我没有误解您的意图,您只想使结果扁平化。您可以使用Spliterator来延迟接收结果,并使用flatMapStream合并为一个扁平的流,例如:
Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
               scrappingFutures.flatMap(each -> unwrap(each));

static <T> Stream<? extends CompletableFuture<T>> 
  unwrap(CompletableFuture<? extends List<? extends T>> master) {

    return generate(new Predicate<Consumer<? super CompletableFuture<T>>>() {

        private Iterator<? extends T> cursor;

        @Override
        public boolean test(Consumer<? super CompletableFuture<T>> consumer) {
            cursor = cursor == null ? await().iterator() : cursor;
            if (cursor.hasNext()) {
                consumer.accept(completedFuture(cursor.next()));
                return true;
            }
            return false;
        }

        //                        v--- blocked at the first time
        private List<? extends T> await() {
            try {
                return master.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new CompletionException(e);
            }
        }
    });
}

static <T> Stream<? extends CompletableFuture<T>> 
  generate(Predicate<Consumer<? super CompletableFuture<T>>> generator) {

    long unknownSize = Long.MAX_VALUE;
    return stream(new AbstractSpliterator<CompletableFuture<T>>(unknownSize, 0) {
        public boolean tryAdvance(Consumer<? super CompletableFuture<T>> action) {
            return generator.test(action);
        }
    }, false);
}

摘要

上述解决方案只是我的第一个想法,并不是在这种情况下最好的方法,您可以与大设计先行对比思考。然而,即使它是一个不好的解决方案,但我仍然会保留它,因为它可能会让其他人以其他方式思考。有关更多详细信息,请参见@Holger在此处和那里的评论。

我承认@Holger所说的最佳方法,由于没有人写下来,请允许我记录下来以服务更多人。

Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
               scrappingFutures.flatMap(each -> each.join().stream())
                               .map(CompletableFuture::completedFuture);

3
我不理解将unwrapgenerate分成两个方法的目的。你在test中写的内容可以直接放到tryAdvance方法中。但无论如何,这并不比一个简单的.flatMap(each -> each.join().stream()) .map(CompletableFuture::completedFuture)更懒惰。 - Holger
@Holger 是的,先生。我同意,在回答后,我发现它可以像您的一样简化。但是我想首先展示Spliterator的用法,也许这会有点开销, :) - holi-java
@Holger 我使用了 Predicate 将迭代逻辑从 generate 中分离出来。我们可以考虑将 Predicate 适配成 Spliterator。我说的对吗,先生? - holi-java
这里没有真正的分离,因为99%的逻辑都在unwrap方法中,而且Predicate处理的是一个Iterator,它本身就带有迭代逻辑。 - Holger
@Holger 感谢您的审查。那么答案是反模式吗?您能给我一些关键词或建议吗?我对此很感兴趣。 - holi-java
@Holger,实际上第一个可行代码中并没有generate方法。我记得使用了Stream#generate,但是在发现jdk中没有适合的内置函数接口之后,我将其与Predicate一起提取出来了 :) - holi-java

1
您可以使用Stream.reduceStream<CompletableFuture<T>>转换为CompletableFuture<Stream<T>>:
static <T> CompletableFuture<Stream<T>> reduce(Stream<CompletableFuture<T>> futures) {
    return futures
        .map(future -> future.thenApply(Stream::of))
        .reduce(
            CompletableFuture.completedFuture(Stream.empty()),
            MyClass::combine);
}

为了使其正常工作,我们仍需要定义一个帮助函数combine,用于组合两个CompletableFuture<Stream<T>>实例,并连接各自未来中的流:
static <T>CompletableFuture<Stream<T>> combine(CompletableFuture<Stream<T>> future1, CompletableFuture<Stream<T>> future2) {
    return future1
        .thenCompose(stream1 -> future2
        .thenApply(stream2 -> Stream.concat(stream1, stream2)));
}

0
通常情况下,您无法执行此类反转操作。在函数类别语言中,您有两个函子在以下类别之间:
T --> Stream<T>
Stream<T> --> CompletableFuture<Stream<T>>

您没有一个函数对象。
T --> CompletableFuture<T>
CompletableFuture<T> --> Stream<CompletableFuture<T>>

如果您不想关心外部的CompletableFuture,并且需要Stream语义,您需要某种类型的StreamTmonad transfomer。目前Java不支持高阶类型,因此我不确定(但也许可以应用一些非标准技巧)是否可能实现类似StreamT的东西。


我不确定我同意第一个观点。将T--> CompletableFuture<T>转换为以下内容:final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() .... - Gonen I
@user889742,我不太明白你的意思。当然可以使用,根据单子定义,我们需要有 pure(t: T): M[T],但是仅凭 T --> CompletableFuture<Stream<T>> 就无法构建 T --> Stream<CompletableFuture<T>>。如果你有一些集合 A --> B --> C 之间的映射关系,你能构建出 A --> C --> B 吗?对我来说没有意义。 - St.Antario

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接