CompletableFuture单任务继续执行多个并行任务

3
我有以下代码:
return CompletableFuture.supplyAsync(() -> {
    return foo; // some custom object
})
.thenAccept(foo -> {
     // ??? need to spawn N async parallel jobs that works on 'foo'
});

第一个任务异步创建了名为foo的对象,然后我需要在它上面运行N个并行进程。有没有更好的方法来完成这个任务呢?
...
CompletableFuture[] parallel = new CompletableFuture[N];
for (int i = 0; i < N; i++) {
    parallel[i] = CompletableFuture.runAsync(() -> {
        work(foo);
    });
}
CompletableFuture.allOf(parallel).join();
...

我不喜欢这种方式,因为在等待N个任务完成时,一个线程会被锁定。

为什么你会有这一行代码 CompletableFuture.allOf(parallel).join();,当你不想等待完成时?没有人要求你等待... - Holger
我当时是盲目的。 - igr
2个回答

2
您可以在特定的先决条件工作上链接任意数量的独立工作,例如:
CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
Collections.nCopies(N, base).forEach(f -> f.thenAcceptAsync(foo -> work(foo)));

这将产生N个并行任务,同时调用work(foo),在提供Foo实例的初始任务完成后。

但请记住,底层框架将考虑可用CPU核心数来确定实际执行并行作业的线程池的大小,因此如果N > #cores,则其中一些作业可能会一个接一个地运行。

如果工作受到I/O限制,因此您希望拥有更多的并行线程,则必须指定自己的执行程序。


nCopies/forEach不是必需的,for循环也可以,但它提供了如何处理依赖于所有这些并行作业完成的后续作业的提示:

CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
CompletableFuture<Void> all = CompletableFuture.allOf(
    Collections.nCopies(N, base).stream()
        .map(f -> f.thenAcceptAsync(foo -> work(foo)))
        .toArray(CompletableFuture<?>[]::new));

现在您可以使用all来检查所有作业的完成情况或链接其他操作。

如果工作是I/O绑定的,那么你想要更高数量的并行线程,为什么呢?也许你错过了同步的I/O绑定,或者更好的说法是,如果工作是阻塞的。 - acelent
@acelent: I/O bound 是一个用于指称所有可能会阻塞的任务的通用术语,与 CPU bound 任务相对。CompletableFuture 使用的默认执行器适合 CPU bound 任务,这就是需要知道的全部内容... - Holger
I/O bound”是一个通用术语,用于描述所有可能会阻塞的任务,与CPU密集型任务相对。但是,我不太同意这个说法。我已经在AsynchronousSocketChannel的方法上使用了CompletionHandler来桥接CompletableFuture,它绝对不需要比核心数更多的线程池线程,因为它不会阻塞。然而,使用任何类型的阻塞代码,无论是I/O、等待还是睡眠,都可能需要更多的线程池线程,以便有尽可能多的可运行线程与核心数相匹配。等待或睡眠中没有I/O。 - acelent
@acelent:不要试图重新定义已经确立的术语。“I/O bound”使用“I/O”的意义是指CPU本身无法处理的所有内容。如果您将请求推送到未争用的队列中,则即使队列的消费者被称为AsynchronousSocketChannel,它也不是I/O bound。而且,如果您让AsynchronousSocketChannelCompletionHandler完成CompletableFuture,那么您实际上正在使用不同的执行程序——即AsynchronousSocketChannel的执行程序。 - Holger
不要试图重新定义既定术语。--无论如何。嗯,如果你让AsynchronousSocketChannelCompletionHandler完成CompletableFuture,你实际上使用了一个不同的执行器——即AsynchronousSocketChannel的执行器。消费者可以接受这一点,或者使用*Async方法。但这已经偏离了主题,我只是举了这个例子作为实际的"I/O",例如yielding/sleeping或等待锁、互斥量、信号量等都不属于"I/O"。我的意思是,甚至连维基百科的文章也没有支持你的定义。让我们就此不再争论。 - acelent
@acelent:这个术语可以追溯到Unix的早期,甚至更早。只是区分了CPU密集型进程和I/O密集型进程,没有其他区别。你只需要阅读维基百科文章的前两句话就能理解这个信息。对于调度程序来说,这种区分很容易实现,只要根据进程消耗的实际CPU时间多少即可。我看到你显然理解这个主题,但不喜欢措辞,但我会坚持几十年来已经确立的措辞,而不引入不必要的复杂性。 - Holger

0

由于CompletableFuture.allOf已经返回另一个CompletableFuture<Void>,因此您可以在其上执行另一个.thenAccept,并在回调中从parallel中提取CFs的返回值,这样您就避免了调用join


或者我可以在外部创建数组,然后在循环中使用thenAcceptAsync创建并行任务。 - igr

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接