Java多线程中的Thread、Runnable和CompletableFuture的区别

4

我正在尝试在我的Spring Boot应用程序中实现多线程。我只是Java多线程的初学者,在进行了一些搜索并阅读了各种页面上的文章之后,我需要澄清以下几点。所以;

1. 就我所看到的,我可以使用 ThreadRunnableCompletableFuture 来实现 Java 应用程序中的多线程。 CompletableFuture 似乎是一种更新更清晰的方法,但 Thread 可能具有更多优势。那么,我应该坚持使用 CompletableFuture 还是根据情况使用它们全部? 2. 基本上,我想通过使用 CompletableFuture 向同一个服务方法发送 2 个并发请求:
CompletableFuture<Integer> future1 = fetchAsync(1);
CompletableFuture<Integer> future2 = fetchAsync(2);

Integer result1 = future1.get();
Integer result2 = future2.get();

我该如何并发地发送这些请求,然后根据以下条件返回结果:

  • 如果第一个结果不为空,则返回结果并停止进程
  • 如果第一个结果为空,则返回第二个结果并停止进程

我应该怎么做?我应该使用CompletableFuture.anyOf()吗?


有没有其他人在Java多线程中从未使用过CompletableFuture - Jack
1
当你说“但线程可能有更多的优势”时,你指的是哪些优势?第二点你具体期望什么?你的代码示例已经完成,只需将result1的类型更改为Integer,否则它根本不可能是null。然后,if (result1 == null) return future2.get(); else return result1;使用CompletableFuture没有方法可以“停止进程”。 - Holger
我想要获取两个请求中的第一个响应,而不是特定的响应。例如,有时我会得到future1,有时会得到future2,因为没有保证。所以,我想要获取第一个响应并检查其值。然后根据这个值获取第二个响应或停止第二个线程。我不必仅使用CompletableFuture来解决问题。如果您有任何关于Thread或Runnable的建议,如果您可以通过发布示例代码分享,我将不胜感激。 - Jack
2
CompletableFuture在幕后使用RunnableThread。我不知道您认为通过手动处理RunnableThread可以获得什么优势。如果您想停止任务,您需要一个支持取消的ExecutorService,它返回真正支持取消的Future对象。但是,这仍然需要对您的任务进行主动中断支持,即由fetchAsync封装的操作。 - Holger
@Holger 感谢您的解释。为什么不像其他朋友一样把示例作为答案发布呢? - Jack
3个回答

5

CompletableFuture 是一种工具,它建立在 Executor/ExecutorService 抽象之上,这些实现处理 RunnableThread。通常情况下,您没有理由手动处理 Thread 创建。如果您发现 CompletableFuture 不适用于特定任务,则可以首先尝试其他工具/抽象。

如果您想要使用第一个(更快的)非空结果继续进行,可以使用类似以下的内容:

CompletableFuture<Integer> future1 = fetchAsync(1);
CompletableFuture<Integer> future2 = fetchAsync(2);

Integer result = CompletableFuture.anyOf(future1, future2)
    .thenCompose(i -> i != null?
        CompletableFuture.completedFuture((Integer)i):
        future1.thenCombine(future2, (a, b) -> a != null? a: b))
    .join();

anyOf 允许您使用第一个结果,但不管其实际值如何。因此,为了使用第一个非空结果,我们需要链接另一个操作,如果第一个结果为 null,则会转而使用 thenCombine。这只有在两个 futures 都已完成时才会完成,但此时我们已经知道更快的结果是 null,需要第二个结果。当两个结果都为 null 时,整个代码仍将导致 null

请注意,anyOf 接受任意类型的 futures,并产生一个 CompletableFuture<Object>。因此,i 的类型为 Object,需要进行类型转换。具有完全类型安全性的替代方法是

CompletableFuture<Integer> future1 = fetchAsync(1);
CompletableFuture<Integer> future2 = fetchAsync(2);

Integer result = future1.applyToEither(future2, Function.identity())
    .thenCompose(i -> i != null?
        CompletableFuture.completedFuture(i):
        future1.thenCombine(future2, (a, b) -> a != null? a: b))
    .join();

这需要我们指定一个我们不需要的函数,因此这段代码使用了 Function.identity()。你也可以使用 i -> i 来表示一个恒等函数;这主要是一种风格选择。


请注意,大多数复杂性源于设计,试图通过始终将依赖操作链接到前一阶段完成后执行来避免阻塞线程。上面的示例遵循此原则,因为最终的join()调用仅用于演示目的;如果调用方期望未来而不是被阻止,则可以轻松删除它并返回未来。
如果您无论如何都要执行最终的阻塞join(),因为您需要立即获得结果值,则还可以使用。
Integer result = future1.applyToEither(future2, Function.identity()).join();
if(result == null) {
    Integer a = future1.join(), b = future2.join();
    result = a != null? a: b;
}

这样可能更容易阅读和调试。这种易用性是即将推出的虚拟线程功能背后的动力。当一个操作在虚拟线程上运行时,您不需要避免阻塞调用。因此,使用此功能,如果您仍然需要返回一个CompletableFuture而不阻塞您的调用线程,您可以使用

CompletableFuture<Integer> resultFuture = future1.applyToEitherAsync(future2, r-> {
    if(r != null) return r;
    Integer a = future1.join(), b = future2.join();
    return a != null? a: b;
}, Executors.newVirtualThreadPerTaskExecutor());

通过为依赖操作请求虚拟线程,我们可以在函数内部使用阻塞的join()调用而不必犹豫,这使得代码更简单,实际上类似于之前的非异步变体。
在所有情况下,如果代码是非空的,则会提供更快的结果,而不必等待第二个future完成。但它不会停止不必要的future的评估。已经进行中的评估不能被CompletableFuture支持。您可以在其上调用cancel(…),但这只会将未来的完成状态(结果)设置为“异常完成,带有CancellationException
因此,无论您是否调用cancel,已经进行中的评估都将在后台继续进行,并且仅忽略其最终结果。

对于某些操作,这可能是可以接受的。如果不行,您将不得不显着更改 fetchAsync 实现。您可以直接使用 ExecutorServicesubmit 操作以获得支持中断的 Future

但这也需要操作代码对中断敏感并具有实际效果:

  • 在调用阻塞操作时,请使用那些可能会中止并抛出 InterruptedException 的方法,而不是捕获并继续执行。

  • 在执行长时间运行的计算密集型任务时,请定期轮询 Thread.interrupted() 并在 true 时退出。


非常非常好,实际上是完美的解释。这正是我期望有经验的用户所做的。非常感谢。 - Jack

4
那么,我应该坚持使用CompletableFuture还是根据具体场景使用它们各自的优势呢?
应根据场景选择最合适的方法。当然,除非您介绍了场景,否则我们不能更加具体。
需要考虑各种因素,例如:
Thread + Runnable没有一种自然的方式来等待/返回结果。(但实现并不难)
反复创建裸线程对象效率低下,因为线程创建成本高昂。 线程池是更好的选择,但您不应实现线程池自己。
使用ExecutorService的解决方案处理线程池并允许您使用Callable并返回Future。 但对于一次性异步计算,这可能会过度杀伤。
涉及ComputableFuture的解决方案允许您组合和合并异步任务。 但如果您不需要这样做,则使用ComputableFuture可能会过度杀伤。
正如您所看到的...对于所有场景,没有一个单一正确的答案。
我应该使用 CompletableFuture.anyOf() 吗?
不需要。你的示例逻辑要求你必须先有 future1 的结果才能确定是否需要 future2 的结果。所以解决方案类似于这样:
Integer i1 = future1.get();
if (i1 == null) {
    return future2.get();
} else {
    future2.cancel(true);
    return i1;
}

请注意,上面的代码与普通的FutureCompletableFuture都可以一起使用。如果之前你使用CompletableFuture是因为认为anyOf是解决方案,那么你其实不需要这样做。调用ExecutorService.submit(Callable)将会给你一个Future ...
如果需要处理任务抛出的异常和/或超时操作,那么情况将会更加复杂。在前一种情况下,你需要捕获ExecutionException并提取其cause异常以获取任务引发的异常。
还有一个警告:第二个计算可能会忽略中断并继续运行。

1
ExecutorService 操作返回 Future,而不是 CompletableFuture。但是您可以将 ExecutorService 用作 CompletableFuture 的参数传递给 …Async 方法。此外,cancel() 需要一个布尔参数,但在此处它实际上没有任何效果。 - Holger
@StephenC 感谢您提供这些精彩的解释。在这个阶段,我认为我们必须获取future1或future2,因为不可能获取它们中的第一个,对吧?另一个问题是,我该如何将异常添加到这个解决方案中呢? - Jack
不是因为那个原因。原因在于你的条件说明只有在future1为“null”时才需要获取future2。所以在决定是否需要调用future2.get之前,你需要先得到future1的答案。 - Stephen C

3
那么,我应该坚持使用CompletableFuture还是根据情况使用它们所有的呢?
嗯,它们都有不同的目的,你可能会直接或间接地使用它们所有:
  • Thread 表示一个线程,虽然它可以被子类化,但在大多数情况下不应该这样做。许多框架维护线程池,即它们会启动几个线程,然后这些线程可以从任务池中获取任务。这样做是为了减少线程创建带来的开销,并减少争用量(许多线程和少数 CPU 核心意味着很多上下文切换,因此通常会尝试只有较少的线程依次处理一个任务)。
  • Runnable 是表示线程可以处理的任务之一的最早接口。另一个是 Callable,它与 Runnable 有两个主要区别:1)它可以返回值,而 Runnable 只有 void,2)它可以抛出已检查异常。根据您的情况,您可以使用任何一个,但由于您想要获得结果,您更可能使用 Callable
  • CompletableFutureFuture 基本上是跨线程通信的一种方式,即您可以使用它们来检查任务是否已完成(非阻塞),或者等待完成(阻塞)。

因此,在许多情况下,它是这样的:

  • 你向某个执行器提交一个RunnableCallable
  • 执行器维护一组Thread来执行你提交的任务
  • 执行器返回一个Future(其中一种实现是CompletableFuture),让你可以检查任务的状态和结果,而无需自己进行同步。

然而,在其他情况下,你可能会直接向Thread提供一个Runnable,甚至是子类Thread,但现在这些情况已经很少见了。

我该怎么做?我应该使用CompletableFuture.anyOf()吗?

CompletableFuture.anyOf()不起作用,因为你无法确定你传入的两个中哪一个先成功。

由于你首先关心的是result1(顺便说一句,如果类型是int,它不能为null),你基本上想要做以下事情:

Integer result1 = future1.get(); //block until result 1 is ready
if( result1 != null ) {
  return result1;
} else {
  return future2.get(); //result1 was null so wait for result2 and return it
}

你不会想立即调用 future2.get(),因为那将一直阻塞直到两个都完成,而是你只对 future1 感兴趣,如果它产生了结果,你就不需要等待 future2 完成。

请注意,上面的代码没有处理异常完成,并且可能有更优雅的组合未来方式,但我暂时不记得了(如果我记得,我会在编辑中添加它)。

另一个注意事项:如果 result1 不为空,你可以调用 future2.cancel(),但我建议你首先检查取消是否有效(例如,你很难真正取消一个 Web 服务请求),以及中断服务的结果是什么。如果只是让它完成并忽略结果,那可能是更容易的方法。


非常感谢您提供这些精彩的解释。在这个阶段,我认为我们必须获取future1或future2,因为不可能获取它们中的第一个,对吧?另一个问题是,我该如何将异常添加到这个解决方案中呢? - Jack
我不确定你问题的第一部分,但是你基本上可以这样做:如果你想使用任何一个未来完成的结果,那么可以使用CompletableFuture.anyOf()。如果你想优先使用未来1而不是2,你需要使用上述方法。至于异常:如果未来失败或被中断,get()可能会抛出异常,因此你需要处理它。 - Thomas

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接