我正在尝试创建一个Akka系统,其中包括响应HTTP请求。我创建了一些交换消息良好的actors,也可以使用akka-http来响应HTTP请求。问题在于连接这两部分。
简而言之: 如何在akka-http请求处理期间与Akka actors通信?
我创建了一个单独的actor负责启动HTTP系统:
class HttpActor extends Actor with ActorLogging {
/* implicits elided */
private def initHttp() = {
val route: Route = path("status") { get { complete { "OK" } } }
Http()(context.system).bindAndHandle(route, "localhost", 8080)
}
private var bound: Option[Future[Http.ServerBinding]] = None
override def receive = {
case HttpActor.Init =>
bound match {
case Some(x) => log.warning("Http already bootstrapping")
case None =>
bound = Some(initHttp(watcher))
}
}
}
object HttpActor {
case object Init
}
正如您所看到的,演员在收到第一条消息时创建了akka-http服务(没有原因,它也可以在构造函数中完成)。
现在,在处理请求期间,我需要与其他演员进行通信,但我无法使其正常工作。
我的方法:
private def initInteractiveHttp() = {
val route: Route = path("status") {
get { complete { "OK" } }
} ~ path("ask") {
get { complete {
// Here are the interesting two lines:
val otherActorResponse = someOtherActor ? SomeMessage
otherActorResponse.mapTo[String]
} }
Http()(context.system).bindAndHandle(route, "localhost", 8080)
}
这会将 SomeMessage
发送给 someOtherActor
并在完成请求-响应周期之前等待响应。然而据我所知,这些消息将从根 HttpActor
发送,这很糟糕,并且在可扩展性方面没有任何好处。理想情况下,我会为每个请求创建一个专门的 Actor 实例,但由于 akka-http 类型限制,这种方法会失败。请考虑以下示例:
class DisposableActor(httpContext: HttpContext) {
override def preStart() = {
// ... send some questions to other actors
}
override def receive = {
case GotAllData(x) => httpContext.complete(x)
}
}
class HttpActorWithDisposables {
// there is a `context` value in scope - we're an actor, after all
private def initHttpWithDisposableActors() = {
val route: Route = path("status") {
get { complete { "OK" } }
} ~ path("ask") {
get { httpContext =>
val props = Props(new DisposableActor(httpContext))
val disposableActor = context.actorOf(props, "disposable-actor-name")
// I have nothing to return here
}
}
Http()(context.system).bindAndHandle(route, "localhost", 8080)
}
通过这种方式,我可以强制DisposableActor在某个时间点调用httpContext.complete。这应该能正确地结束请求-响应处理周期。然而,Route DSL要求在get块内返回有效的响应(或Future),因此这种方法不起作用。