如何使用Akka actors处理Java future?

4

我有一个Java Web应用程序中的分层架构。UI层只是Java,服务层使用类型化的Akka actors和外部服务调用(WS、DB等)被包装在Hystrix命令中。

UI调用服务,服务返回一个Akka future。这是一个Akka future,因为我想通过onComplete和onFailure回调使UI编码更简单,这是Akka futures提供的。然后服务创建了一个future,进行一些映射等操作,并包装了一个调用返回Java future的HystrixCommand。

因此,伪代码如下:

UI

AkkaFuture future = service.getSomeData();

服务

public AkkaFuture getSomeData() {
    return future {
        JavaFuture future = new HystrixCommand(mapSomeData()).queue()
        //what to do here, currently just return future.get()
    }
}

问题在于,我希望释放服务Actor正在使用的线程,并仅占用Hystrix使用的线程。但是Java Future会阻止这样做,因为我必须在其完成时阻塞它。我能想到的唯一选择(我不确定是否喜欢)是不断轮询Java Future,并在Java Future完成时完成Akka Future。
注意:问题实际上与Hystrix本身无关,但如果有人提出特定于Hystrix的解决方案,我决定提及它。
3个回答

3

我将@Hbf的回答标记为解决方案,因为我最终按照How do I wrap a java.util.concurrent.Future in an Akka Future?中所述创建了一个Akka轮询器。供参考,我还尝试过:

  • 创建HystrixCommandExcutionHook并扩展HystrixCommand以允许回调。这并没有起作用,因为挂钩在错误的时间没有被调用。
  • 使用Guavas可监听的future,通过装饰的执行器在Hystrix中创建futures,然后从命令中转换futures。不起作用,因为Hystrix使用无法装饰的ThreadPoolExecutor。

编辑:由于原始答案是用Scala编写的,并且如果Java future没有正常取消则会挂起,因此我在下面添加了Akka轮询器代码。下面的解决方案总是在超时后退出线程。


    protected  Future wrapJavaFutureInAkkaFuture(final java.util.concurrent.Future javaFuture, final Option maybeTimeout, final ActorSystem actorSystem) {
      final Promise promise = Futures.promise();
        if (maybeTimeout.isDefined()) {
          pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option.option(maybeTimeout.get().fromNow()), actorSystem);
        } else {
          pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option. none(), actorSystem);
        }

        return promise.future();
    }

    protected  void pollJavaFutureUntilDoneOrCancelled(final java.util.concurrent.Future javaFuture, final Promise promise, final Option maybeTimeout, final ActorSystem actorSystem) {
      if (maybeTimeout.isDefined() && maybeTimeout.get().isOverdue()) {
        // on timeouts, try to cancel the Java future and simply walk away
        javaFuture.cancel(true);
        promise.failure(new ExecutionException(new TimeoutException("Future timed out after " + maybeTimeout.get())));

      } else if (javaFuture.isDone()) {
        try {
          promise.success(javaFuture.get());
        } catch (final Exception e) {
          promise.failure(e);
        }
      } else {
            actorSystem.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), new Runnable() {
          @Override
          public void run() {
            pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, maybeTimeout, actorSystem);
          }
        }, actorSystem.dispatcher());
      }
    }

2
Java的Future在设计上被认为不如Scala的Future。例如,看看"如何将java.util.concurrent.Future包装到Akka Future中"的讨论。
但是:也许,除了像上面讨论中建议的轮询之外,Hystrix还提供了一种onComplete回调?我完全不了解这个库,但偶然发现了Hystrix API中的onComplete,也许它有所帮助?

2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接