Scala异步/等待和并行化

42

我正在学习Scala中异步/等待的用法。我在https://github.com/scala/async上读到了这个。

理论上,此代码是异步(非阻塞)的,但它没有并行化:

def slowCalcFuture: Future[Int] = ...             
def combined: Future[Int] = async {               
   await(slowCalcFuture) + await(slowCalcFuture)
}
val x: Int = Await.result(combined, 10.seconds)    

而这另一个则进行了并行处理:

def combined: Future[Int] = async {
  val future1 = slowCalcFuture
  val future2 = slowCalcFuture
  await(future1) + await(future2)
}

它们之间唯一的区别在于使用了中间变量。这会如何影响并行化?

3个回答

54

由于这个概念类似于C#中的async&await,所以也许我能提供一些见解。在C#中,一个可以使用await等待的Task应该是“热”的,即已经在运行。我假设在Scala中也是一样的,从函数返回的Future不需要显式地启动,只是在调用后自动“运行”。 如果不是这种情况,则以下纯属(可能不真实的)猜测。

让我们分析第一种情况:

async {
    await(slowCalcFuture) + await(slowCalcFuture)
}

我们到达那个代码块并执行第一个await:

async {
    await(slowCalcFuture) + await(slowCalcFuture)
    ^^^^^
}

好的,所以我们正在异步等待这个计算完成。当它完成时,我们会“继续”分析这个区块:

async {
    await(slowCalcFuture) + await(slowCalcFuture)
                            ^^^^^
}

第二个await,所以我们正在异步等待第二个计算完成。在完成后,我们可以通过添加两个整数来计算最终结果。
如您所见,我们正在逐步移动await,按顺序等待Future一个接一个地到来。
让我们看一下第二个例子:
async {
  val future1 = slowCalcFuture
  val future2 = slowCalcFuture
  await(future1) + await(future2)
}

好的,这里是(可能)发生的事情:

async {
  val future1 = slowCalcFuture // >> first future is started, but not awaited
  val future2 = slowCalcFuture // >> second future is started, but not awaited
  await(future1) + await(future2)
  ^^^^^
}

我们正在等待第一个Future,但是两个Futures目前都在运行。当第一个返回时,第二个可能已经完成(因此我们将立即获得结果),或者我们可能需要再等待一段时间。

现在很明显,第二个示例在并行运行两个计算,然后等待它们都完成。当两个都准备好时,它就会返回。第一个示例以非阻塞的方式顺序运行计算。


非常有帮助的解释。现在我明白了。谢谢! :-) - Sanete

24

如果有点难以理解,但Patryk的回答是正确的。关于async/await的主要理解是它只是做 FutureflatMap 的另一种方式。在幕后并没有并发魔法。所有的调用都在async块中按顺序执行,包括await,它实际上不会阻塞执行线程,而是将async块的其余部分封装在闭包中,并将其作为 Future 完成时的回调函数传递。因此,在第一段代码中,由于还没有人启动第二个计算,所以第二个计算直到第一个 await 完成后才开始执行。


дҪ зҡ„еӣһзӯ”зңҹзҡ„еҫҲжңүз”ЁпјҢжҲ‘еңЁйҳ…иҜ»е…ідәҺasync/awaitзҡ„ж–ҮжЎЈж—¶жғізҹҘйҒ“зҡ„жҳҜпјҢжІЎжңүдёҖдёӘж–ҮжЎЈжҸҗеҲ°дёҚдјҡе®һйҷ…йҳ»еЎһжү§иЎҢзәҝзЁӢиҝҷ件дәӢгҖӮ - Freewind

1
在第一种情况下,您创建一个新线程来执行一个慢速任务,并在单个调用中等待它完成。因此,在第一个任务完成后才执行第二个慢速任务。
在第二种情况下,当调用val future1 = slowCalcFuture时,它实际上创建了一个新线程,将指向“slowCalcFuture”函数的指针传递给线程,并说“请执行它”。获取线程实例并将指向函数的指针传递给线程实例需要花费足够的时间。这可以被认为是瞬间完成的。因此,因为val future1 = slowCalcFuture被转换为“获取线程并传递指针”操作,所以它在很短的时间内完成,而下一行代码val future2 = slowCalcFuture也会立即执行。特性2也会立即安排执行。 val future1 = slowCalcFutureawait(slowCalcFuture)之间的根本区别就像是要求别人为你做咖啡和等待咖啡准备好一样。询问需要2秒钟:即说出短语“请你给我做咖啡”。但等待咖啡准备好需要4分钟。
可能对此任务的修改是等待第一个可用答案。例如,您想连接到集群中的任何服务器。您向您所知道的每个服务器发出连接请求,并且响应的第一个服务器将成为您的服务器。您可以使用以下方式完成此操作: Future.firstCompletedOf(Array(slowCalcFuture, slowCalcFuture))

但是,一个单线程能否“分裂”成两个其他线程?如果+操作可以允许加法的“并行”计算,那么我们不能有并行性吗?在Scala中是否有“+”的“并行版本”? - Jose Cabrera Zuniga
不太确定您的意思。线程并不会被“生成”到其他线程中,但任务是可以的。因此,您可以使任何返回Future的函数,并在生成的线程中执行它。@JoseCabreraZuniga - Vadym Chekan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接