CompletableFuture
是一种工具,它建立在 Executor
/ExecutorService
抽象之上,这些实现处理 Runnable
和 Thread
。通常情况下,您没有理由手动处理 Thread
创建。如果您发现 CompletableFuture
不适用于特定任务,则可以首先尝试其他工具/抽象。
如果您想要使用第一个(更快的)非空结果继续进行,可以使用类似以下的内容:
CompletableFuture<Integer> future1 = fetchAsync(1);
CompletableFuture<Integer> future2 = fetchAsync(2);
Integer result = CompletableFuture.anyOf(future1, future2)
.thenCompose(i -> i != null?
CompletableFuture.completedFuture((Integer)i):
future1.thenCombine(future2, (a, b) -> a != null? a: b))
.join();
anyOf
允许您使用第一个结果,但不管其实际值如何。因此,为了使用第一个非空结果,我们需要链接另一个操作,如果第一个结果为 null
,则会转而使用 thenCombine
。这只有在两个 futures 都已完成时才会完成,但此时我们已经知道更快的结果是 null
,需要第二个结果。当两个结果都为 null
时,整个代码仍将导致 null
。
请注意,anyOf
接受任意类型的 futures,并产生一个 CompletableFuture<Object>
。因此,i
的类型为 Object
,需要进行类型转换。具有完全类型安全性的替代方法是
CompletableFuture<Integer> future1 = fetchAsync(1);
CompletableFuture<Integer> future2 = fetchAsync(2);
Integer result = future1.applyToEither(future2, Function.identity())
.thenCompose(i -> i != null?
CompletableFuture.completedFuture(i):
future1.thenCombine(future2, (a, b) -> a != null? a: b))
.join();
这需要我们指定一个我们不需要的函数,因此这段代码使用了 Function.identity()
。你也可以使用 i -> i
来表示一个恒等函数;这主要是一种风格选择。
请注意,大多数复杂性源于设计,试图通过始终将依赖操作链接到前一阶段完成后执行来避免阻塞线程。上面的示例遵循此原则,因为最终的
join()
调用仅用于演示目的;如果调用方期望未来而不是被阻止,则可以轻松删除它并返回未来。
如果您无论如何都要执行最终的阻塞
join()
,因为您需要立即获得结果值,则还可以使用。
Integer result = future1.applyToEither(future2, Function.identity()).join();
if(result == null) {
Integer a = future1.join(), b = future2.join();
result = a != null? a: b;
}
这样可能更容易阅读和调试。这种易用性是即将推出的虚拟线程功能背后的动力。当一个操作在虚拟线程上运行时,您不需要避免阻塞调用。因此,使用此功能,如果您仍然需要返回一个CompletableFuture
而不阻塞您的调用线程,您可以使用
CompletableFuture<Integer> resultFuture = future1.applyToEitherAsync(future2, r-> {
if(r != null) return r;
Integer a = future1.join(), b = future2.join();
return a != null? a: b;
}, Executors.newVirtualThreadPerTaskExecutor());
通过为依赖操作请求虚拟线程,我们可以在函数内部使用阻塞的
join()
调用而不必犹豫,这使得代码更简单,实际上类似于之前的非异步变体。
在所有情况下,如果代码是非空的,则会提供更快的结果,而不必等待第二个future完成。但它不会停止不必要的future的评估。已经进行中的评估不能被
CompletableFuture
支持。您可以在其上调用
cancel(…)
,但这只会
将未来的完成状态(结果)设置为“异常完成,带有CancellationException
”。
因此,无论您是否调用
cancel
,已经进行中的评估都将在后台继续进行,并且仅忽略其最终结果。
对于某些操作,这可能是可以接受的。如果不行,您将不得不显着更改 fetchAsync
实现。您可以直接使用 ExecutorService
并 submit
操作以获得支持中断的 Future
。
但这也需要操作代码对中断敏感并具有实际效果:
CompletableFuture
? - Jackresult1
的类型更改为Integer
,否则它根本不可能是null
。然后,if (result1 == null) return future2.get(); else return result1;
使用CompletableFuture
没有方法可以“停止进程”。 - Holgerfuture1
,有时会得到future2
,因为没有保证。所以,我想要获取第一个响应并检查其值。然后根据这个值获取第二个响应或停止第二个线程。我不必仅使用CompletableFuture
来解决问题。如果您有任何关于Thread或Runnable的建议,如果您可以通过发布示例代码分享,我将不胜感激。 - JackCompletableFuture
在幕后使用Runnable
和Thread
。我不知道您认为通过手动处理Runnable
和Thread
可以获得什么优势。如果您想停止任务,您需要一个支持取消的ExecutorService
,它返回真正支持取消的Future
对象。但是,这仍然需要对您的任务进行主动中断支持,即由fetchAsync
封装的操作。 - Holger