Twitter的未来。collect无法并发工作(Scala)

5

作为一个来自node.js背景的人,我对Scala还不太熟悉。我尝试使用Twitter的Future.collect来进行一些简单的并发操作,但我的代码表现出顺序行为而非并发行为。我做错了什么?

以下是我的代码:

import com.twitter.util.Future

def waitForSeconds(seconds: Int, container:String): Future[String]  = Future[String] {
  Thread.sleep(seconds*1000)
  println(container + ": done waiting for " + seconds + " seconds")
  container + " :done waiting for " + seconds + " seconds"
}

def mainFunction:String = {
  val allTasks = Future.collect(Seq(waitForSeconds(1, "All"), waitForSeconds(3, "All"), waitForSeconds(2, "All")))
  val singleTask = waitForSeconds(1, "Single")

  allTasks onSuccess  { res =>
    println("All tasks succeeded with result " + res)
  }

  singleTask onSuccess { res =>
    println("Single task succeeded with result " + res)
  }

  "Function Complete"
}

println(mainFunction)

这是我得到的输出结果:

All: done waiting for 1 seconds
All: done waiting for 3 seconds
All: done waiting for 2 seconds
Single: done waiting for 1 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete

我期望的输出结果是:
All: done waiting for 1 seconds
Single: done waiting for 1 seconds
All: done waiting for 2 seconds
All: done waiting for 3 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete

Future 的整个意义在于将计算卸载到另一个线程中。因此,在 mainFunction 的第一行中,将会生成三个新线程,这些线程完成后将生成另一个线程来执行 oncomplete 函数。 在第二行中,另一个线程被生成并将与其他线程并行执行。因此,我认为实际输出似乎是有效的... - irundaia
1个回答

9
Twitter的Future比Scala标准库的Future更明确地说明了计算在哪里执行。特别是,Future.apply将可安全捕获异常(像s.c.Future一样),但不会说明计算将在哪个线程中运行。在您的情况下,计算在主线程中运行,这就是您看到的结果的原因。
这种方法比标准库的future API有几个优点。首先,它使方法签名更简单,因为不需要隐式传递 ExecutionContext。更重要的是,它使避免上下文切换变得更容易(Brian Degenhardt在这里进行了经典解释)。在这方面,Twitter的Future更像Scalaz的Task,并具有基本相同的性能优势(例如在此博客文章中描述)。
更明确地说明计算在哪里运行的缺点是,您必须更明确地说明计算在哪里运行。在您的情况下,您可以编写类似以下内容的代码:
import com.twitter.util.{ Future, FuturePool }

val pool = FuturePool.unboundedPool

def waitForSeconds(seconds: Int, container:String): Future[String] = pool {
  Thread.sleep(seconds*1000)
  println(container + ": done waiting for " + seconds + " seconds")
  container + " :done waiting for " + seconds + " seconds"
}

这不会完全产生您所要求的输出(“函数完成”将首先被打印出来,而allTaskssingleTask不是相互排序的),但它将在单独的线程上并行运行任务。
(作为脚注:我上面示例中的FuturePool.unboundedPool是创建演示用未来池的简便方法,并且通常很好,但它不适用于CPU密集型计算-请参见 FuturePool API文档以了解其他方式来创建将使用您提供并且可以自己管理的ExecutorService的未来池。)

1
太好了! 我使用了你的代码,并在最后一行添加了Thread.sleep(6000),现在我看到了并发行为。更改后的输出结果为:功能完成 所有:完成等待1秒钟 单个:完成等待1秒钟 单个任务成功,结果为Single:完成等待1秒钟 所有:完成等待2秒钟 所有:完成等待3秒钟 所有任务成功,结果为ArraySeq(All:完成等待1秒,All:完成等待3秒,All:完成等待2秒) - Ram
2
再补充一点。最近Twitter发布了一份关于其Futures的文档,其中解释了一些设计决策:https://github.com/twitter/finagle/blob/develop/doc/src/sphinx/developers/Futures.rst - Vladimir Kostyukov
1
(com.twitter.util.)使用Await.result(allTasks)可能比Thread.sleep(6000)更好。 - user4933928

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接