你观察到的是预期行为——但当然这是非常糟糕的。很幸运,已经存在已知的解决方案和最佳实践来防范这种情况。在这个答案中,我想花一些时间简短、详细地解释这个问题——享受阅读吧!
简短回答:“不要阻塞路由基础架构!”,始终使用专用的调度程序来处理阻塞操作!
观察到的症状的原因:问题在于你正在使用context.dispatcher
作为阻塞future执行的调度程序。相同的调度程序(简单来说只是一组线程)被路由基础架构用于实际处理传入请求,因此如果你阻塞了所有可用线程,就会导致路由基础架构饥饿。(一个值得辩论和基准测试的问题是,Akka HTTP能否保护免受此影响,我将把它添加到我的研究待办事项列表中。)
阻塞必须特别小心地处理,以免影响同一调度程序的其他用户(这就是为什么我们使它如此简单,可以将执行分离到不同的调度程序中),正如Akka文档部分所解释的那样:阻塞需要仔细管理。
我想在这里提醒大家的是,如果可能的话,应该尽量避免阻塞API - 如果你的长时间运行操作实际上不是一个操作,而是一系列操作,那么你可以将它们分开到不同的actors或序列化的futures中。无论如何,我只是想指出 - 如果可能的话,避免这样的阻塞调用,但如果必须这样做,那么下面解释了如何正确处理。
深入分析和解决方案:
既然我们知道了概念上的问题,现在让我们看看以上代码中到底有什么问题,以及解决这个问题的正确方法是什么:
颜色=线程状态:
- 青绿色-SLEEPING
- 橙色-WAITING
- 绿色-RUNNABLE
现在让我们研究3个代码片段以及它们如何影响调度程序和应用程序的性能。为了强制执行此行为,该应用程序已承受以下负载:
- [a]不断请求GET请求(请参见上述初始问题中的代码),在那里不会阻塞
- [b]然后过一段时间触发2000个POST请求,这将导致在返回future之前阻塞5秒钟
1) [bad]
错误代码时的调度行为:
implicit val defaultDispatcher = system.dispatcher
val routes: Route = post {
complete {
Future {
Thread.sleep(5000)
System.currentTimeMillis().toString
}
}
}
因此,我们将我们的应用程序暴露给[一个]负载,您可以看到许多akka.actor.default-dispatcher线程 - 它们正在处理请求 - 小绿色片段和橙色表示其他线程实际上处于空闲状态。
![blocking is killing the default dispatcher](https://istack.dev59.com/ciWxv.webp)
然后我们开始[b]加载,导致这些线程阻塞 - 你可以看到一个早期的线程"default-dispatcher-2,3,4"在空闲后进入阻塞。我们还观察到池增长 - 新线程启动"default-dispatcher-18,19,20,21...",但它们立即进入睡眠状态(!) - 我们在浪费宝贵的资源!这样启动的线程数量取决于默认调度程序配置,但可能不会超过50个左右。由于我们刚刚触发了2k个阻塞操作,我们使整个线程池饥饿 - 阻塞操作占主导地位,路由基础设施没有可用线程来处理其他请求 - 非常糟糕!让我们采取一些措施(这是Akka最佳实践 - 始终像下面所示隔离阻塞行为):
2) [好!]
调度程序行为良好的结构化代码/调度程序:
在您的application.conf
中配置此专用于阻塞行为的调度程序:
my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
// in Akka previous to 2.4.2:
core-pool-size-min = 16
core-pool-size-max = 16
max-pool-size-min = 16
max-pool-size-max = 16
// or in Akka 2.4.2+
fixed-pool-size = 16
}
throughput = 100
}
你应该阅读
Akka调度程序文档,以了解此处的各种选项。然而,主要的一点是我们选择了一个
ThreadPoolExecutor
,它有一个硬线程限制,用于保持可用于阻塞操作的线程数。大小设置取决于你的应用程序执行什么操作以及服务器有多少个核心。
接下来,我们需要使用它,而不是默认的线程池:
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")
val routes: Route = post {
complete {
Future {
Thread.sleep(5000)
System.currentTimeMillis().toString
}
}
}
我们会使用相同的负载来测试应用程序,首先进行一些正常请求,然后再添加阻塞请求。这就是在这种情况下ThreadPools的行为方式:
![the blocking pool scales to our needs](https://istack.dev59.com/VUjKR.webp)
最初,普通请求由默认调度程序轻松处理,您可以看到几条绿色线-这是实际执行(我没有真正使服务器承受重负,所以它大多数时间处于空闲状态)。
现在,当我们开始发出阻塞操作时,my-blocking-dispatcher-*
启动并达到配置的线程数。它在其中处理所有睡眠操作。此外,在这些线程上长时间没有任何活动后,它会关闭它们。如果我们向服务器发送另一堆阻塞,则池将启动新线程来处理它们的睡眠(sleep()),但同时-我们不会浪费珍贵的线程“只是呆在那里什么也不做”。
使用此设置时,普通GET请求的吞吐量不受影响,它们仍然在(仍然相当空闲的)默认调度程序上快乐地服务。
这是处理反应式应用程序中任何类型的阻塞的推荐方式。它经常被称为隔离应用程序的问题部分的“散装头部”(或“隔离”),在这种情况下,不良行为是睡眠/阻塞。
3) [workaround-ish]
当适当应用blocking
时,调度程序的行为:
在这个例子中,我们使用
scala.concurrent.blocking
的scaladoc方法,当面临阻塞操作时可以帮助。它通常会导致更多的线程被启动以应对阻塞操作。
implicit val dispatcher = system.dispatcher
val routes: Route = post {
complete {
Future {
blocking {
Thread.sleep(5000)
System.currentTimeMillis().toString
}
}
}
}
应用程序将表现如下:
![blocking causes more threads to be started](https://istack.dev59.com/jLKOq.webp)
您会注意到创建了很多新的线程,这是因为阻塞提示“噢,这将是阻塞的,所以我们需要更多的线程”。这使得我们被阻塞的总时间比1)示例中更小,但是在阻塞操作完成后就有数百个线程无事可做......当然,它们最终会被关闭(FJP会这样做),但是一段时间内我们将有大量(不受控制的)线程在运行,与2)解决方案形成对比,我们知道为阻塞行为分配了确切的线程数。
总结:永远不要阻塞默认的调度程序 :-)
最佳实践是使用2)中显示的模式,将一个调度程序用于阻塞操作,并在那里执行它们。
讨论的Akka HTTP版本:2.0.1
使用的分析工具: 很多人私下里问我上面图片中可视化线程状态所用的分析工具是什么,所以在这里添加这个信息:我使用了YourKit,它是一个很棒的商业分析工具(开源软件免费),不过你也可以使用免费的OpenJDK的VisualVM达到相同的效果。