Akka HTTP:在未来中阻塞将阻塞服务器

38

我正在尝试使用Akka HTTP对我的请求进行基本身份验证。 恰好我有一个外部资源可以通过身份验证,因此我必须向这个资源发出rest调用。

这需要一些时间,在处理过程中,似乎我的API其余部分被阻塞,等待这个调用。 我已经用一个非常简单的例子重现了这个问题:

// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()


val routes = 
  (post & entity(as[String])) { e =>
    complete {
      Future{
        Thread.sleep(5000)
        e
      }
    }
  } ~
  (get & path(Segment)) { r =>
    complete {
      "get"
    }
  }
如果我向日志端点发布内容,我的获取端点也会被卡在等待5秒钟的时间上,这是由日志端点决定的。
这是预期行为吗?如果是,那么如何进行阻塞操作而不阻塞整个API?
2个回答

134

你观察到的是预期行为——但当然这是非常糟糕的。很幸运,已经存在已知的解决方案和最佳实践来防范这种情况。在这个答案中,我想花一些时间简短、详细地解释这个问题——享受阅读吧!

简短回答:“不要阻塞路由基础架构!”,始终使用专用的调度程序来处理阻塞操作!

观察到的症状的原因:问题在于你正在使用context.dispatcher作为阻塞future执行的调度程序。相同的调度程序(简单来说只是一组线程)被路由基础架构用于实际处理传入请求,因此如果你阻塞了所有可用线程,就会导致路由基础架构饥饿。(一个值得辩论和基准测试的问题是,Akka HTTP能否保护免受此影响,我将把它添加到我的研究待办事项列表中。)

阻塞必须特别小心地处理,以免影响同一调度程序的其他用户(这就是为什么我们使它如此简单,可以将执行分离到不同的调度程序中),正如Akka文档部分所解释的那样:阻塞需要仔细管理

我想在这里提醒大家的是,如果可能的话,应该尽量避免阻塞API - 如果你的长时间运行操作实际上不是一个操作,而是一系列操作,那么你可以将它们分开到不同的actors或序列化的futures中。无论如何,我只是想指出 - 如果可能的话,避免这样的阻塞调用,但如果必须这样做,那么下面解释了如何正确处理。

深入分析和解决方案:

既然我们知道了概念上的问题,现在让我们看看以上代码中到底有什么问题,以及解决这个问题的正确方法是什么:

颜色=线程状态:

  • 青绿色-SLEEPING
  • 橙色-WAITING
  • 绿色-RUNNABLE

现在让我们研究3个代码片段以及它们如何影响调度程序和应用程序的性能。为了强制执行此行为,该应用程序已承受以下负载:

  • [a]不断请求GET请求(请参见上述初始问题中的代码),在那里不会阻塞
  • [b]然后过一段时间触发2000个POST请求,这将导致在返回future之前阻塞5秒钟

1) [bad] 错误代码时的调度行为:

// BAD! (due to the blocking in Future):
implicit val defaultDispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses defaultDispatcher
      Thread.sleep(5000)                    // will block on the default dispatcher,
      System.currentTimeMillis().toString   // starving the routing infra
    }
  }
}

因此,我们将我们的应用程序暴露给[一个]负载,您可以看到许多akka.actor.default-dispatcher线程 - 它们正在处理请求 - 小绿色片段和橙色表示其他线程实际上处于空闲状态。

blocking is killing the default dispatcher

然后我们开始[b]加载,导致这些线程阻塞 - 你可以看到一个早期的线程"default-dispatcher-2,3,4"在空闲后进入阻塞。我们还观察到池增长 - 新线程启动"default-dispatcher-18,19,20,21...",但它们立即进入睡眠状态(!) - 我们在浪费宝贵的资源!这样启动的线程数量取决于默认调度程序配置,但可能不会超过50个左右。由于我们刚刚触发了2k个阻塞操作,我们使整个线程池饥饿 - 阻塞操作占主导地位,路由基础设施没有可用线程来处理其他请求 - 非常糟糕!让我们采取一些措施(这是Akka最佳实践 - 始终像下面所示隔离阻塞行为):

2) [好!] 调度程序行为良好的结构化代码/调度程序:

在您的application.conf中配置此专用于阻塞行为的调度程序:

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    // in Akka previous to 2.4.2:
    core-pool-size-min = 16
    core-pool-size-max = 16
    max-pool-size-min = 16
    max-pool-size-max = 16
    // or in Akka 2.4.2+
    fixed-pool-size = 16
  }
  throughput = 100
}

你应该阅读Akka调度程序文档,以了解此处的各种选项。然而,主要的一点是我们选择了一个ThreadPoolExecutor,它有一个硬线程限制,用于保持可用于阻塞操作的线程数。大小设置取决于你的应用程序执行什么操作以及服务器有多少个核心。
接下来,我们需要使用它,而不是默认的线程池:
// GOOD (due to the blocking in Future):
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")

val routes: Route = post { 
  complete {
    Future { // uses the good "blocking dispatcher" that we configured, 
             // instead of the default dispatcher – the blocking is isolated.
      Thread.sleep(5000)
      System.currentTimeMillis().toString
    }
  }
}

我们会使用相同的负载来测试应用程序,首先进行一些正常请求,然后再添加阻塞请求。这就是在这种情况下ThreadPools的行为方式:

the blocking pool scales to our needs

最初,普通请求由默认调度程序轻松处理,您可以看到几条绿色线-这是实际执行(我没有真正使服务器承受重负,所以它大多数时间处于空闲状态)。

现在,当我们开始发出阻塞操作时,my-blocking-dispatcher-*启动并达到配置的线程数。它在其中处理所有睡眠操作。此外,在这些线程上长时间没有任何活动后,它会关闭它们。如果我们向服务器发送另一堆阻塞,则池将启动新线程来处理它们的睡眠(sleep()),但同时-我们不会浪费珍贵的线程“只是呆在那里什么也不做”。

使用此设置时,普通GET请求的吞吐量不受影响,它们仍然在(仍然相当空闲的)默认调度程序上快乐地服务。

这是处理反应式应用程序中任何类型的阻塞的推荐方式。它经常被称为隔离应用程序的问题部分的“散装头部”(或“隔离”),在这种情况下,不良行为是睡眠/阻塞。

3) [workaround-ish]当适当应用blocking时,调度程序的行为

在这个例子中,我们使用 scala.concurrent.blocking的scaladoc方法,当面临阻塞操作时可以帮助。它通常会导致更多的线程被启动以应对阻塞操作。
// OK, default dispatcher but we'll use `blocking`
implicit val dispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses the default dispatcher (it's a Fork-Join Pool)
      blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
                 // but at the cost of exploding the number of threads (which eventually
                 // may also lead to starvation problems, but on a different layer)
        Thread.sleep(5000)
        System.currentTimeMillis().toString
       }
    }
  }
}

应用程序将表现如下:

blocking causes more threads to be started

您会注意到创建了很多新的线程,这是因为阻塞提示“噢,这将是阻塞的,所以我们需要更多的线程”。这使得我们被阻塞的总时间比1)示例中更小,但是在阻塞操作完成后就有数百个线程无事可做......当然,它们最终会被关闭(FJP会这样做),但是一段时间内我们将有大量(不受控制的)线程在运行,与2)解决方案形成对比,我们知道为阻塞行为分配了确切的线程数。

总结:永远不要阻塞默认的调度程序 :-)

最佳实践是使用2)中显示的模式,将一个调度程序用于阻塞操作,并在那里执行它们。

讨论的Akka HTTP版本2.0.1

使用的分析工具: 很多人私下里问我上面图片中可视化线程状态所用的分析工具是什么,所以在这里添加这个信息:我使用了YourKit,它是一个很棒的商业分析工具(开源软件免费),不过你也可以使用免费的OpenJDK的VisualVM达到相同的效果。


1
我们现在已将此回复包含在官方文档中:http://doc.akka.io/docs/akka/2.4/scala/http/handling-blocking-operations-in-akka-http-routes.html#handling-blocking-in-http-routes-scala - Konrad 'ktoso' Malawski
以上链接已失效。 - Abhijit Sarkar
如果我想返回一个响应并在后台继续工作怎么办?看起来这个是有效的。 - Abhijit Sarkar
1
没问题。Akka流在不同的调度程序上工作。这应该更多地是一个新的顶级问题,而不是劫持这个线程。 - Konrad 'ktoso' Malawski
这似乎足够相关,可以在这里提问,但现在我已经为超时创建了一个单独的问题 - Abhijit Sarkar
阻塞调度程序是否需要使用任何类型的路由? - Diego Ramos

3
奇怪的是,对我来说一切都正常(没有被阻止)。这里是代码:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer

import scala.concurrent.Future


object Main {

  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val routes: Route = (post & entity(as[String])) { e =>
    complete {
      Future {
        Thread.sleep(5000)
        e
      }
    }
  } ~
    (get & path(Segment)) { r =>
      complete {
        "get"
      }
    }

  def main(args: Array[String]) {

    Http().bindAndHandle(routes, "0.0.0.0", 9000).onFailure {
      case e =>
        system.shutdown()
    }
  }
}

您可以将异步代码包装在 onCompleteonSuccess 指令中:
onComplete(Future{Thread.sleep(5000)}){e} 

onSuccess(Future{Thread.sleep(5000)}){complete(e)}

没错,我也是。我刚刚用 akka-http 2.0.1 进行了测试。 - expert
你也可以尝试将Future包装在onComplete/onSuccess指令中。 - Igor Mielientiev

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接