Scala中的异步IO与futures

69

假设我要从一些URL中获取(可能很多)图片的列表。我使用Scala,这是我会做的:

import scala.actors.Futures._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val fimages: List[Future[...]] = urls.map (url => future { download url })

// Do something (display) when complete
fimages.foreach (_.foreach (display _))

我对Scala还不太熟悉,所以这看起来有些像魔术:

  • 这是正确的方法吗?如果不是,有什么替代方法吗?
  • 如果我要下载100张图片,它会同时创建100个线程吗,还是会使用线程池?
  • 最后一个指令(display _)会在主线程上执行吗?如果不是,我该如何确保它在主线程上执行?

感谢您的建议!

3个回答

137

在Scala 2.10中使用Futures。它们是Scala团队,Akka团队和Twitter共同开发的,旨在实现更标准化的未来API和实现,以供各种框架使用。我们刚刚发布了一份指南: http://docs.scala-lang.org/overviews/core/futures.html

除了完全非阻塞(默认情况下,尽管我们提供管理阻塞操作的能力)和可以组合,Scala 2.10 Futures还带有一个隐式线程池来执行您的任务,以及一些工具来管理超时。

import scala.concurrent.{future, blocking, Future, Await, ExecutionContext.Implicits.global}
import scala.concurrent.duration._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val imagesFuts: List[Future[...]] = urls.map {
  url => future { blocking { download url } }
}

// Do something (display) when complete
val futImages: Future[List[...]] = Future.sequence(imagesFuts)
Await.result(futImages, 10 seconds).foreach(display)

首先,我们导入了许多内容:

  • future:用于创建future的API。
  • blocking:用于管理阻塞的API。
  • Future:包含许多有用的方法用于future的集合的未来伴随对象。
  • Await:单例对象,用于阻塞未来(将其结果传输到当前线程)。
  • ExecutionContext.Implicits.global:默认的全局线程池,一个ForkJoin池。
  • duration._:用于管理超时时间的实用程序。

imagesFuts 基本上与您最初做的相同- 这里唯一的区别是我们使用了受管阻塞 - blocking。它会通知线程池,您传递给它的代码块包含长时间运行或阻塞操作。这允许池临时生成新的工作程序以确保永远不会发生所有工作程序都被阻塞的情况。这样做是为了防止在阻塞应用程序中发生饥饿(锁定线程池)。请注意,当受管阻塞块中的代码完成时,线程池也会知道-因此它将在那一点上删除多余的工作线程,这意味着池将缩小到其预期大小。

(如果您希望绝对防止创建其他线程,则应使用AsyncIO库,例如Java的NIO库。)

然后,我们使用Future伴随对象的集合方法,将imagesFutsList[Future[...]]转换为Future[List[...]]

Await对象是我们如何确保display在调用线程上执行-- Await.result只需强制当前线程等待传递给它的future完成即可。(这在内部使用受管阻塞。)


技术上:不惜一切代价避免阻塞。只有在没有其他选择的情况下才进行阻塞。 - Viktor Klang
1
但是回答你之前的问题- 是的,如果所有线程都阻塞并且您不使用受管理的阻塞,则默认的FJPool可能会用尽工作线程。而且,您可以创建自己的ExecutionContext,例如使用Swing的invokeLater,并将其明确传递给futImages上的foreach,而不是使用Await.result - Heather Miller
我是一个新手,有一个问题 - imageFuts期货何时开始执行?我想不是在map命令中,因为您是在此开始执行之后附加“侦听器”吗? - User
2
blocking背后的机制是如何工作的?它是否有自己的线程池,还是在我们通过blocking提交任务时会创建新的线程? - maks
4
为什么在 url => future { blocking { download url } } 中要使用阻塞,而不是只使用 url => future { download url } - Incerteza
显示剩余6条评论

5
val all = Future.traverse(urls){ url =>
  val f = future(download url) /*(downloadContext)*/
  f.onComplete(display)(displayContext)
  f
}
Await.result(all, ...)
  1. 在2.10版本中使用scala.concurrent.Future,目前是RC版本。
  2. 它使用隐式的ExecutionContext。
  3. 新的Future文档明确指出,如果值可用,则onComplete(和foreach)可能会立即评估。旧版actors Future也是如此。根据您对显示的要求,您可以提供适当的ExecutionContext(例如单线程执行程序)。如果您只想让主线程等待加载完成,traverse为您提供了一个可等待的未来。

3
  1. Yes, seems fine to me, but you may want to investigate more powerful twitter-util or Akka Future APIs (Scala 2.10 will have a new Future library in this style).

  2. It uses a thread pool.

  3. No, it won't. You need to use the standard mechanism of your GUI toolkit for this (SwingUtilities.invokeLater for Swing or Display.asyncExec for SWT). E.g.

    fimages.foreach (_.foreach(im => SwingUtilities.invokeLater(new Runnable { display im })))
    

谢谢你的回答,我很高兴知道我的方法是合理的!实际上,我正在尝试在Android上使用Scala,所以这会很有用,相比可怕的Java语法! - F.X.
关于第三点,我在你回答之前正在思考并尝试了一些简单的测试用例,似乎它确实在主线程上执行。我只是创建了一个简单的future{"test"},并在其上运行了foreach(s => println(Thread.currentThread.getName()),它打印出了main。我是否有什么误解? - F.X.
我认为Scala控制台会为您键入的每个命令生成线程。我刚刚尝试了 println(...getName()); f.foreach(s => ...getName()) (在一行中),并得到了两次 Thread-20。很奇怪。 - F.X.
@F.X. 我正在尝试在Android上使用Futures,但它们似乎会在主线程上执行,而不考虑像你之前所说的执行上下文。你能理解为什么吗? - Aleyna
@Aleyna 不,我不是这样认为的,但正如Alexey所说,我不会假设它总是这样。这可能与future{"test"}几乎立即完成有关,后续完成块只是检索结果,因此没有使用辅助线程。长时间运行的futures可能会在worker本身中运行其完成块,如果我没记错的话,文档对此没有说明。 - F.X.
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接