如何为futures配置一个经过精细调整的线程池?

80

Scala的Future线程池有多大?

我的Scala应用程序创建了许多数百万个future {},我想知道是否有什么可以优化它们的方法,例如配置线程池。

谢谢。


Slick 3.0使用自己的连接和线程池,那么为什么在它管理自己的线程池时还需要提供隐式执行上下文给Slick呢? - Rahul Gulabani
1
@RahulGulabani,《Essential Slick》一书中提到:mapflatMap方法允许在连接操作时调用任意代码。Slick无法让这些代码在其自身的执行上下文中运行,因为它无法知道您是否会长时间占用Slick的线程。 - srzhio
4个回答

159

这个答案来自于monkjack,是采纳答案下的评论。然而,有些人可能会错过这个很棒的答案,所以我在这里重新发布。

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

如果您只需要更改线程池数量,只需使用全局执行器并传递以下系统属性。

-Dscala.concurrent.context.numThreads=8 -Dscala.concurrent.context.maxThreads=8

2
这是一个更优雅的解决方案。 - Ben Hutchison
1
我尝试了这两个选项,值都设为5,但仍然看到最多8个线程同时运行。 - micseydel

90

您可以指定自己的ExecutionContext,以便运行在其中的future,而不是导入全局隐式ExecutionContext。

import java.util.concurrent.Executors
import scala.concurrent._

implicit val ec = new ExecutionContext {
    val threadPool = Executors.newFixedThreadPool(1000)

    def execute(runnable: Runnable) {
        threadPool.submit(runnable)
    }

    def reportFailure(t: Throwable) {}
}

74
非常好的回答,您可以通过使用ExecutionContext上的辅助方法来减少样板代码,该方法允许您直接从给定的Executor实例化。例如:implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) - sksamuel
1
虽然这一切都很好,但是 implicits.global 上的线程是否有真正的限制?如果有,是否可以像 Akka 一样通过 application.conf 进行配置? - Nick
6
@Nick 是的,implicits.global 只使用每个 CPU 核心 1 个线程。这对于 CPU 密集型任务来说是最优的。但是对于经典的阻塞 IO(例如 JDBC)来说,它会导致性能灾难。 - Ben Hutchison
2
我需要在使用完后添加一个调用来关闭线程池,否则程序永远不会终止... def shutdown() = threadPool.shutdown() - justinhj
2
为什么在“正常”情况下它会关闭,但当我们将implicit设置为其他值时却不会关闭? - hbogert

3

在Scala Futures中指定线程池的最佳方法:

implicit val ec = new ExecutionContext {
      val threadPool = Executors.newFixedThreadPool(conf.getInt("5"));
      override def reportFailure(cause: Throwable): Unit = {};
      override def execute(runnable: Runnable): Unit = threadPool.submit(runnable);
      def shutdown() = threadPool.shutdown();
    }

-1
class ThreadPoolExecutionContext(val executionContext: ExecutionContext)

object ThreadPoolExecutionContext {

  val executionContextProvider: ThreadPoolExecutionContext = {
    try {
      val executionContextExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(25))
      new ThreadPoolExecutionContext(executionContextExecutor)
    } catch {
      case exception: Exception => {
        Log.error("Failed to create thread pool", exception)
        throw exception
      }
    }
  }
}

这里没有描述你想要实现什么。 - Joshua Stafford

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接