我有一段老代码,它返回的是java.util.concurrent.Future类型,现在我需要把这个方法调用封装在一个层级中,并返回reactor publishers Mono或者Flux。我认为转换成它们中的任意一种的方法应该是类似的,那么正确的转换成Mono的方式是什么。
例如,假设我从API获取Future并且我需要Mono。
我有一段老代码,它返回的是java.util.concurrent.Future类型,现在我需要把这个方法调用封装在一个层级中,并返回reactor publishers Mono或者Flux。我认为转换成它们中的任意一种的方法应该是类似的,那么正确的转换成Mono的方式是什么。
例如,假设我从API获取Future并且我需要Mono。
Mono.fromCallable(() -> {
try {
Future future = null;
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).subscribeOn(Schedulers.boundedElastic());
您可以使用 BlockHound https://github.com/reactor/BlockHound 来验证您的代码是否有效地非阻塞。
您可以将Future::get
用作Mono::fromSupplier
中的供应商。
像这样:Mono.fromSupplier(Future::get)
。
Future::get
抛出的是受检异常,无法使用方法引用进行处理。最简单的方法是提供一个带有 try/catch
的 lambda 表达式来调用 get
方法:Mono.fromSupplier(() -> { try { return future.get(); } catch(....) { ... } });
- zshiftCallable
允许检查异常。 - wilmol
Future
转换为CompleteableFuture
,如此处所述 - 因为future在完成时没有“回调”的概念,您必须阻塞线程。这肯定不太好看,但不幸的是没有明智的方法。 https://dev59.com/B2Ag5IYBdhLWcg3ws8ro - Michael Berry