将Java Future转换为CompletableFuture

136

Java 8引入了CompletableFuture,这是一种可组合的Future实现(包括一系列thenXxx方法)。我希望只使用它,但我想使用的许多库只返回不可组合的Future实例。

有没有一种方式可以将返回的Future实例包装在CompleteableFuture中,以便我可以组合它?

8个回答

73

如果你想使用的库除了Future风格之外也提供了回调函数风格的方法,你可以提供一个处理器来完成CompletableFuture而不会有任何额外的线程阻塞。像这样:

    AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ... 
    CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
    open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)

如果没有回调函数,我唯一看到的解决方法是使用轮询循环,将所有Future.isDone()检查放在单个线程上,并在可以获取 Future 时调用 complete。


1
我正在使用Apache Http异步库,它接受FutureCallback。这让我的生活变得轻松 :) - Abhishek Gayakwad

69
如果你的Future是调用ExecutorService方法(例如submit())的结果,最简单的方法是使用CompletableFuture.runAsync(Runnable, Executor)方法。
Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);

to

Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);

然后,"CompletableFuture"是通过本地方式创建的。

编辑:根据@SamMefford和@MartinAndersson的评论,如果要传递一个Callable,需要调用supplyAsync()Callable<T>转换为Supplier<T>,例如:

CompletableFuture.supplyAsync(() -> {
    try { return myCallable.call(); }
    catch (Exception ex) { throw new CompletionException(ex); } // Or return default value
}, myExecutor);

由于T Callable.call() throws Exception;会抛出异常,而T Supplier.get();不会抛出异常,因此您必须捕获异常以使原型兼容。
关于异常处理的说明 get()方法没有指定throws,这意味着它不应该抛出已检查异常。但是,可以使用未检查异常。在CompletableFuture中的代码显示,使用了CompletionException并且是未检查的(即RuntimeException),因此需要通过catch/throw将任何异常包装成CompletionException
另外,正如@WeGa所指出的那样,您可以使用handle()方法来处理可能由结果引发的异常:
CompletableFuture<T> future = CompletableFuture.supplyAsync(...);
future.handle((ex,res) -> {
        if (ex != null) {
            // An exception occurred ...
        } else {
            // No exception was thrown, 'res' is valid and can be handled here
        }
    });

2
或者,如果您使用的是Callable<T>而不是Runnable,请尝试使用supplyAsync:CompletableFuture<T> future = CompletableFuture.supplyAsync(myCallable, myExecutor); - Sam Mefford
1
supplyAsync 接收一个 Supplier。如果您尝试传入一个 Callable,代码将无法编译。 - Martin Andersson
@MartinAndersson 没错,谢谢。我进一步编辑将 Callable<T> 转换为 Supplier<T> - Matthieu
1
您还可以使用java.util.concurrent.CompletableFuture#handle实现异常处理,紧随CompletableFuture之后进行链接:CompletableFuture.supplyAsync(...).handle((response, throwable) -> ...) - WeGa

62

有一种方法,但你可能不喜欢。以下方法将 Future<T> 转换为 CompletableFuture<T>

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}

显然,这种方法的问题在于,对于每个Future,都会有一个线程被阻塞以等待Future的结果--这与Futures的想法相矛盾。在某些情况下,可能可以做得更好。然而,一般情况下,没有不主动等待Future结果的解决方案。

15
嗯...这个解决方案会占用"公共池"的一个线程来等待吗?"公共池"的线程不应该阻塞...嗯... - Peti
1
@Peti:你说得没错。但是要点在于,如果你很可能正在做一些错误的事情,无论你使用的是常规池还是无界线程池都是如此。 - nosid
4
ن½؟用CompletableFuture.supplyAsync(supplier, new SinglethreadExecutor())هڈ¯èƒ½ن¸چه®Œç¾ژ,ن½†è‡³ه°‘ن¸چن¼ڑéک»ه،‍ه…¬ه…±و± ç؛؟程م€‚ - MikeFHay
8
请,千万不要那样做。 - Laymain
2
@MikeFHay 我认为使用 FJP.ManagedBlocker 将比 CompletableFuture.supplyAsync(supplier, t -> newSingleThreadExecutor()) 产生更好的利用效果。我已经更新了答案中的代码以反映这一点。 - leventov
显示剩余3条评论

15

我发布了一个小项目futurity,旨在比答案中的直接方法更好。

主要思想是仅使用一个线程(当然不是一个忙等待循环)来检查其中所有Future状态,这有助于避免为每个Future -> CompletableFuture转换阻塞线程池中的线程。

用法示例:

Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);

1
这看起来很有趣。它是使用定时器线程吗?为什么这不是被接受的答案? - Kira
1
@Kira 是的,它基本上使用一个计时器线程等待所有已提交的future。 - Dmitry Spikhalskiy

9
建议:

https://gabfssilva.github.io/old-java-future-to-completable-future/

但是,基本上:
public class CompletablePromiseContext {
    private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();

    public static void schedule(Runnable r) {
        SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
    }
}

而且,CompletablePromise:
public class CompletablePromise<V> extends CompletableFuture<V> {
    private Future<V> future;

    public CompletablePromise(Future<V> future) {
        this.future = future;
        CompletablePromiseContext.schedule(this::tryToComplete);
    }

    private void tryToComplete() {
        if (future.isDone()) {
            try {
                complete(future.get());
            } catch (InterruptedException e) {
                completeExceptionally(e);
            } catch (ExecutionException e) {
                completeExceptionally(e.getCause());
            }
            return;
        }

        if (future.isCancelled()) {
            cancel(true);
            return;
        }

        CompletablePromiseContext.schedule(this::tryToComplete);
    }
}

例子:

public class Main {
    public static void main(String[] args) {
        final ExecutorService service = Executors.newSingleThreadExecutor();
        final Future<String> stringFuture = service.submit(() -> "success");
        final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);

        completableFuture.whenComplete((result, failure) -> {
            System.out.println(result);
        });
    }
}

1
这很容易理解,优雅并适用于大多数用例。我会将 CompletablePromiseContext 设为非静态,并接受检查间隔的参数(此处设置为 1 毫秒),然后重载 CompletablePromise<V> 构造函数,以便能够提供自己的 CompletablePromiseContext,其中包含可能不同(更长)的检查间隔,用于长时间运行的 Future<V>,在这种情况下,您不必绝对能够立即运行回调(或组合),还可以拥有 CompletablePromiseContext 实例来监视一组 Future(如果您有许多)。 - Dexter Legaspi

6
让我提出另一个(希望更好的)选项:https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata/concurrent 简要来说,想法是这样的:
  1. 引入 CompletableTask<V> 接口--CompletionStage<V> + RunnableFuture<V> 的结合
  2. ExecutorService 进行封装,使其从 submit(...) 方法返回 CompletableTask (而不是 Future<V>
  3. 完成,我们现在拥有可运行和可组合的 Futures。
实现使用了一种备用的 CompletionStage 实现(请注意,这里使用的是CompletionStage 而不是 CompletableFuture):
用法:
J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
   .submit( someCallableA )
   .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
   .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c); 

2
小更新:代码已移动到单独的项目 https://github.com/vsilaev/tascalate-concurrent,并且现在可以使用来自 java.util.concurrent 的开箱即用 Executor-s。 - Valery Silaev

4
public static <T> CompletableFuture<T> fromFuture(Future<T> f) {
    return CompletableFuture.completedFuture(null).thenCompose(avoid -> {
        try {
            return CompletableFuture.completedFuture(f.get());
        } catch (InterruptedException e) {
            return CompletableFuture.failedFuture(e);
        } catch (ExecutionException e) {
            return CompletableFuture.failedFuture(e.getCause());
        }
    });
}

0

主要思路如下:

Future<?> future = null;
return CompletableFuture.supplyAsync(future::get);

然而,你的编译器会发出一些警告。

因此,这是第一个选项。

Future<?> future = null;
return CompletableFuture.supplyAsync(
        ()->{
            try {
                return future.get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });

第二个选项,通过将函数接口强制转换来隐藏try...catch。

    @FunctionalInterface
    public interface MySupplier<T> extends Supplier<T> {
        @Override
        default T get() {
            try {
                return getInternal();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        T getInternal() throws Exception;
    }


    public static void main(String[] args) {
        Future<?> future = null;
        return CompletableFuture.supplyAsync((MySupplier<?>) future::get);

    }


第三个选项,找到一些提供了这种功能接口的第三方库。

另请参阅:Java 8 Lambda函数抛出异常?


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接