Akka演员系统与HTTP接口

4

我正在尝试创建一个Akka系统,其中包括响应HTTP请求。我创建了一些交换消息良好的actors,也可以使用akka-http来响应HTTP请求。问题在于连接这两部分。

简而言之: 如何在akka-http请求处理期间与Akka actors通信?

我创建了一个单独的actor负责启动HTTP系统:

class HttpActor extends Actor with ActorLogging  {
  /* implicits elided */

  private def initHttp() = {
    val route: Route =  path("status") { get { complete { "OK" } } }
    Http()(context.system).bindAndHandle(route, "localhost", 8080)
  }
  private var bound: Option[Future[Http.ServerBinding]] = None

  override def receive = {
    case HttpActor.Init =>
      bound match {
        case Some(x) => log.warning("Http already bootstrapping")
        case None =>
          bound = Some(initHttp(watcher))
      }

  }
}

object HttpActor {
  case object Init
}

正如您所看到的,演员在收到第一条消息时创建了akka-http服务(没有原因,它也可以在构造函数中完成)。

现在,在处理请求期间,我需要与其他演员进行通信,但我无法使其正常工作。

我的方法:

  private def initInteractiveHttp() = {
    val route: Route =  path("status") { 
      get { complete { "OK" } } 
    } ~ path("ask") {
      get { complete {
        // Here are the interesting two lines:
        val otherActorResponse = someOtherActor ? SomeMessage
        otherActorResponse.mapTo[String]
    } }
    Http()(context.system).bindAndHandle(route, "localhost", 8080)
  }

这会将 SomeMessage 发送给 someOtherActor 并在完成请求-响应周期之前等待响应。然而据我所知,这些消息将从根 HttpActor 发送,这很糟糕,并且在可扩展性方面没有任何好处。理想情况下,我会为每个请求创建一个专门的 Actor 实例,但由于 akka-http 类型限制,这种方法会失败。请考虑以下示例:

class DisposableActor(httpContext: HttpContext) {
    override def preStart() = {
       // ... send some questions to other actors
    }
    override def receive = {
      case GotAllData(x) => httpContext.complete(x)
    }
}

class HttpActorWithDisposables {
  // there is a `context` value in scope - we're an actor, after all
  private def initHttpWithDisposableActors() = {
    val route: Route =  path("status") { 
      get { complete { "OK" } } 
    } ~ path("ask") {
      get { httpContext =>
        val props = Props(new DisposableActor(httpContext))
        val disposableActor = context.actorOf(props, "disposable-actor-name")
        // I have nothing to return here
      }
    }
    Http()(context.system).bindAndHandle(route, "localhost", 8080)
  }

通过这种方式,我可以强制DisposableActor在某个时间点调用httpContext.complete。这应该能正确地结束请求-响应处理周期。然而,Route DSL要求在get块内返回有效的响应(或Future),因此这种方法不起作用。
1个回答

4
您的第一种方法实际上非常好。ask模式为您创建了一个轻量级、一次性的演员,等待结果(非阻塞地)完成未来任务。它基本上做了您想要用DisposableActor和主HttpActor进行复制的所有事情,并且不会给您的主HttpActor带来压力。
如果您仍然想使用其他演员,则可以使用completeWith指令:
completeWith(instanceOf[String]) { complete =>
  // complete is of type String => Unit
  val props = Props(new DisposableActor(complete))
  val disposableActor = context.actorOf(props, "disposable-actor-name")

  // completeWith expects you to return unit
}

在你的演员中,当你获得结果时,请调用complete函数。

class DisposableActor(complete: String => Unit) {
  override def receive = {
    case GotAllData(x) => 
      complete(x)
      context stop self // don't for get to stop the actor
  }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接