如何为Scala ExecutionContext配置关闭策略?

4

我最近在一台机器上遇到了一些奇怪的行为,当map一个返回Future[T]的函数时,它是顺序执行的。但是,在其他机器上不会发生这个问题:工作会像预期的那样交错进行。后来我发现,这可能是因为Scala太聪明了,选择了一个与该机器资源匹配的ExecutionContext:一个核心,一个worker。

下面是一些复现问题的简单代码:

import scala.concurrent._
import scala.concurrent.duration._

val ss = List("the", "quick", "brown", "fox",
              "jumped", "over", "the", "lazy", "dog")

def r(s: String) : Future[String] = future {
        for (i <- 1 to 10) yield {
                println(String.format("r(%s) waiting %s more seconds...",
                        s, (10 - i).toString))
                Thread.sleep(1000)

        }
        s.reverse
}

val f_revs = ss.map { r(_) }

println("Look ma, no blocking!")

val rev = f_revs.par.map { Await.result(_, Duration.Inf) }.mkString(" ")

println(rev)

在表现出奇怪行为的计算机上运行此程序会产生如下连续输出:
Look ma, no blocking!
r(the) waiting 9 more seconds...
r(the) waiting 8 more seconds...
r(the) waiting 7 more seconds...

提供自定义ExecutionContext:
val pool = Executors.newFixedThreadPool(1)
implicit val ec = ExecutionContext.fromExecutor(pool)

允许线程在此计算机上交错。但我现在有一个新问题:线程池不会关闭,导致程序无限期挂起。显然,这是FixedThreadPool预期行为,我可以通过在某个地方放置pool.shutdown()来关闭它。

请原谅我的固执,但我不想告诉线程池要关闭。是否有一种方法可以配置池,在所有队列为空时(可能需要一些延迟),就像默认池一样关闭?我已经查看了ExecutionContext文档,但没有找到我要找的内容。

2个回答

2
Scala使用自己的fork-join实现,它的行为与Java的不同,因此默认的ExecutionContext和您使用Executors创建的ExecutionContext之间存在不同的行为。
做到您想要的更容易的方法是设置以下系统属性来配置默认的ExecutionContext:
  1. scala.concurrent.context.minThreads来强制设定线程的最小数量。默认值为1
  2. scala.concurrent.context.numThreads来设定线程数。默认为x1
  3. scala.concurrent.context.maxThreads来强制设定线程的最大数量。默认为x1
这些属性可以是一个数字,也可以是一个数字前面加上x,以指示处理器数量的倍数。要增加线程数,您必须同时更改numThreadsmaxThreads。在您的情况下,将两者都设置为x2应该可以解决问题。

1

看起来Java 7有一些额外的ExecutorService,特别是一个ForkJoinPool,它可以做我想要的事情(即,不需要shutdown()池)。

将池更改为以下内容就足以实现我想要的:

val pool = new java.util.concurrent.ForkJoinPool(5)

据说Java 8拥有更多的服务


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接