我将@Hbf的回答标记为解决方案,因为我最终按照How do I wrap a java.util.concurrent.Future in an Akka Future?中所述创建了一个Akka轮询器。供参考,我还尝试过:
- 创建HystrixCommandExcutionHook并扩展HystrixCommand以允许回调。这并没有起作用,因为挂钩在错误的时间没有被调用。
- 使用Guavas可监听的future,通过装饰的执行器在Hystrix中创建futures,然后从命令中转换futures。不起作用,因为Hystrix使用无法装饰的ThreadPoolExecutor。
编辑:由于原始答案是用Scala编写的,并且如果Java future没有正常取消则会挂起,因此我在下面添加了Akka轮询器代码。下面的解决方案总是在超时后退出线程。
protected Future wrapJavaFutureInAkkaFuture(final java.util.concurrent.Future javaFuture, final Option maybeTimeout, final ActorSystem actorSystem) {
final Promise promise = Futures.promise();
if (maybeTimeout.isDefined()) {
pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option.option(maybeTimeout.get().fromNow()), actorSystem);
} else {
pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option. none(), actorSystem);
}
return promise.future();
}
protected void pollJavaFutureUntilDoneOrCancelled(final java.util.concurrent.Future javaFuture, final Promise promise, final Option maybeTimeout, final ActorSystem actorSystem) {
if (maybeTimeout.isDefined() && maybeTimeout.get().isOverdue()) {
javaFuture.cancel(true);
promise.failure(new ExecutionException(new TimeoutException("Future timed out after " + maybeTimeout.get())));
} else if (javaFuture.isDone()) {
try {
promise.success(javaFuture.get());
} catch (final Exception e) {
promise.failure(e);
}
} else {
actorSystem.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), new Runnable() {
@Override
public void run() {
pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, maybeTimeout, actorSystem);
}
}, actorSystem.dispatcher());
}
}