我最近在一台机器上遇到了一些奇怪的行为,当map
一个返回Future[T]
的函数时,它是顺序执行的。但是,在其他机器上不会发生这个问题:工作会像预期的那样交错进行。后来我发现,这可能是因为Scala太聪明了,选择了一个与该机器资源匹配的ExecutionContext
:一个核心,一个worker。
下面是一些复现问题的简单代码:
import scala.concurrent._
import scala.concurrent.duration._
val ss = List("the", "quick", "brown", "fox",
"jumped", "over", "the", "lazy", "dog")
def r(s: String) : Future[String] = future {
for (i <- 1 to 10) yield {
println(String.format("r(%s) waiting %s more seconds...",
s, (10 - i).toString))
Thread.sleep(1000)
}
s.reverse
}
val f_revs = ss.map { r(_) }
println("Look ma, no blocking!")
val rev = f_revs.par.map { Await.result(_, Duration.Inf) }.mkString(" ")
println(rev)
在表现出奇怪行为的计算机上运行此程序会产生如下连续输出:
Look ma, no blocking!
r(the) waiting 9 more seconds...
r(the) waiting 8 more seconds...
r(the) waiting 7 more seconds...
提供自定义
ExecutionContext
:val pool = Executors.newFixedThreadPool(1)
implicit val ec = ExecutionContext.fromExecutor(pool)
允许线程在此计算机上交错。但我现在有一个新问题:线程池不会关闭,导致程序无限期挂起。显然,这是FixedThreadPool
的预期行为,我可以通过在某个地方放置pool.shutdown()
来关闭它。
请原谅我的固执,但我不想告诉线程池要关闭。是否有一种方法可以配置池,在所有队列为空时(可能需要一些延迟),就像默认池一样关闭?我已经查看了ExecutionContext
文档,但没有找到我要找的内容。