Java 8引入了CompletableFuture
,这是一种可组合的Future实现(包括一系列thenXxx方法)。我希望只使用它,但我想使用的许多库只返回不可组合的Future
实例。
有没有一种方式可以将返回的Future
实例包装在CompleteableFuture
中,以便我可以组合它?
如果你想使用的库除了Future风格之外也提供了回调函数风格的方法,你可以提供一个处理器来完成CompletableFuture而不会有任何额外的线程阻塞。像这样:
AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
// ...
CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
completableFuture.complete(buffer);
}
@Override
public void failed(Throwable exc, Void attachment) {
completableFuture.completeExceptionally(exc);
}
});
completableFuture.thenApply(...)
如果没有回调函数,我唯一看到的解决方法是使用轮询循环,将所有Future.isDone()
检查放在单个线程上,并在可以获取 Future 时调用 complete。
Future
是调用ExecutorService
方法(例如submit()
)的结果,最简单的方法是使用CompletableFuture.runAsync(Runnable, Executor)
方法。Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);
to
Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);
然后,"CompletableFuture"是通过本地方式创建的。
编辑:根据@SamMefford和@MartinAndersson的评论,如果要传递一个Callable
,需要调用supplyAsync()
将Callable<T>
转换为Supplier<T>
,例如:
CompletableFuture.supplyAsync(() -> {
try { return myCallable.call(); }
catch (Exception ex) { throw new CompletionException(ex); } // Or return default value
}, myExecutor);
T Callable.call() throws Exception;
会抛出异常,而T Supplier.get();
不会抛出异常,因此您必须捕获异常以使原型兼容。get()
方法没有指定throws
,这意味着它不应该抛出已检查异常。但是,可以使用未检查异常。在CompletableFuture
中的代码显示,使用了CompletionException
并且是未检查的(即RuntimeException
),因此需要通过catch/throw将任何异常包装成CompletionException
。handle()
方法来处理可能由结果引发的异常:CompletableFuture<T> future = CompletableFuture.supplyAsync(...);
future.handle((ex,res) -> {
if (ex != null) {
// An exception occurred ...
} else {
// No exception was thrown, 'res' is valid and can be handled here
}
});
CompletableFuture<T> future = CompletableFuture.supplyAsync(myCallable, myExecutor);
- Sam MeffordsupplyAsync
接收一个 Supplier
。如果您尝试传入一个 Callable
,代码将无法编译。 - Martin AnderssonCallable<T>
转换为 Supplier<T>
。 - Matthieu有一种方法,但你可能不喜欢。以下方法将 Future<T>
转换为 CompletableFuture<T>
:
public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
// Normally, this should never happen inside ForkJoinPool
Thread.currentThread().interrupt();
// Add the following statement if the future doesn't have side effects
// future.cancel(true);
throw new RuntimeException(e);
}
});
}
private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}
private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}
CompletableFuture.supplyAsync(supplier, new SinglethreadExecutor())
هڈ¯èƒ½ن¸چه®Œç¾ژ,ن½†è‡³ه°‘ن¸چن¼ڑéک»ه،ه…¬ه…±و± ç؛؟程م€‚ - MikeFHayFJP.ManagedBlocker
将比 CompletableFuture.supplyAsync(supplier, t -> newSingleThreadExecutor())
产生更好的利用效果。我已经更新了答案中的代码以反映这一点。 - leventovhttps://gabfssilva.github.io/old-java-future-to-completable-future/
但是,基本上:public class CompletablePromiseContext {
private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();
public static void schedule(Runnable r) {
SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
}
}
public class CompletablePromise<V> extends CompletableFuture<V> {
private Future<V> future;
public CompletablePromise(Future<V> future) {
this.future = future;
CompletablePromiseContext.schedule(this::tryToComplete);
}
private void tryToComplete() {
if (future.isDone()) {
try {
complete(future.get());
} catch (InterruptedException e) {
completeExceptionally(e);
} catch (ExecutionException e) {
completeExceptionally(e.getCause());
}
return;
}
if (future.isCancelled()) {
cancel(true);
return;
}
CompletablePromiseContext.schedule(this::tryToComplete);
}
}
例子:
public class Main {
public static void main(String[] args) {
final ExecutorService service = Executors.newSingleThreadExecutor();
final Future<String> stringFuture = service.submit(() -> "success");
final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);
completableFuture.whenComplete((result, failure) -> {
System.out.println(result);
});
}
}
CompletablePromiseContext
设为非静态,并接受检查间隔的参数(此处设置为 1 毫秒),然后重载 CompletablePromise<V>
构造函数,以便能够提供自己的 CompletablePromiseContext
,其中包含可能不同(更长)的检查间隔,用于长时间运行的 Future<V>
,在这种情况下,您不必绝对能够立即运行回调(或组合),还可以拥有 CompletablePromiseContext
实例来监视一组 Future
(如果您有许多)。 - Dexter LegaspiCompletableTask<V>
接口--CompletionStage<V>
+ RunnableFuture<V>
的结合ExecutorService
进行封装,使其从 submit(...)
方法返回 CompletableTask
(而不是 Future<V>
)J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
.submit( someCallableA )
.thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
.thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c);
public static <T> CompletableFuture<T> fromFuture(Future<T> f) {
return CompletableFuture.completedFuture(null).thenCompose(avoid -> {
try {
return CompletableFuture.completedFuture(f.get());
} catch (InterruptedException e) {
return CompletableFuture.failedFuture(e);
} catch (ExecutionException e) {
return CompletableFuture.failedFuture(e.getCause());
}
});
}
主要思路如下:
Future<?> future = null;
return CompletableFuture.supplyAsync(future::get);
然而,你的编译器会发出一些警告。
因此,这是第一个选项。
Future<?> future = null;
return CompletableFuture.supplyAsync(
()->{
try {
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
第二个选项,通过将函数接口强制转换来隐藏try...catch。
@FunctionalInterface
public interface MySupplier<T> extends Supplier<T> {
@Override
default T get() {
try {
return getInternal();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
T getInternal() throws Exception;
}
public static void main(String[] args) {
Future<?> future = null;
return CompletableFuture.supplyAsync((MySupplier<?>) future::get);
}
第三个选项,找到一些提供了这种功能接口的第三方库。
另请参阅:Java 8 Lambda函数抛出异常?