使用返回future的函数遍历列表和流程

41

介绍

Scala的Future (从2.10版本开始,现在已经到了2.9.3版本) 是一个可应用函子(applicative functor),这意味着如果我们有一个可遍历类型F,我们可以取一个F[A]和一个函数A => Future[B]并将它们转换成一个Future[F[B]]

这个操作在标准库中被称为 Future.traverseScalaz 7 还提供了一个更通用的 traverse 方法,如果我们从 scalaz-contrib 导入了 Future 的 applicative functor 实例,我们就可以在这里使用它。
这两种 traverse 方法在处理流时表现不同。标准库的遍历会在返回之前消耗掉流,而 Scalaz 的 会立即返回 future
import scala.concurrent._
import ExecutionContext.Implicits.global

// Hangs.
val standardRes = Future.traverse(Stream.from(1))(future(_))

// Returns immediately.
val scalazRes = Stream.from(1).traverse(future(_))

另外还有一个区别,正如Leif Warner这里所观察到的那样。标准库的traverse会立即启动所有异步操作,而Scalaz的则会启动第一个,等待它完成,然后开始第二个,再等待它完成,以此类推。

对于流的不同行为

很容易通过编写一个函数来展示这第二个区别,该函数将在流中的第一个值上睡眠几秒钟:
def howLong(i: Int) = if (i == 1) 10000 else 0

import scalaz._, Scalaz._
import scalaz.contrib.std._

def toFuture(i: Int)(implicit ec: ExecutionContext) = future {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

现在,Future.traverse(Stream(1, 2))(toFuture) 将打印以下内容:
Starting 1!
Starting 2!
Done 2!
Done 1!

而 Scalaz 版本 (Stream(1, 2).traverse(toFuture)):

Starting 1!
Done 1!
Starting 2!
Done 2!

这可能不是我们想要的。

列表呢?

奇怪的是,在列表上,这两个遍历方式在这方面的行为是相同的 - Scalaz 的遍历方式不会等待一个未来完成后再开始下一个。

另一个未来

Scalaz 还包括自己的 concurrent 包,其中包含了自己的未来实现。我们可以使用与上面相同的设置:

import scalaz.concurrent.{ Future => FutureZ, _ }

def toFutureZ(i: Int) = FutureZ {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

然后,我们得到了Scalaz对于列表和流的行为:
Starting 1!
Done 1!
Starting 2!
Done 2!

也许不出所料的是,遍历无限流仍然会立即返回。
问题:
此时我们真的需要一个表来总结,但只能用列表:
- 使用标准库遍历的流:在返回之前消耗;不等待每个 future。 - 使用 Scalaz 遍历的流:立即返回;等待每个 future 完成。 - 使用流的 Scalaz futures:立即返回;等待每个 future 完成。
以及:
- 使用标准库遍历的列表:不等待。 - 使用 Scalaz 遍历的列表:不等待。 - 使用列表的 Scalaz futures:等待每个 future 完成。
这有意义吗?在列表和流上执行此操作是否存在“正确”的行为?是否有某种原因使得“最异步”的行为——即在返回之前不消耗集合,并且在移动到下一个之前不等待每个 future 完成——在这里没有被表示?

2
在“最佳”情况下,对流进行Future.traverse操作将返回一个流(意味着在输出时请求元素时从输入进行惰性读取)的Futures,这些Futures在请求时创建。虽然这是可能的,但实现起来更加困难。 - soulcheck
2
@soulcheck:在这个上下文中,traverse返回一个Future [Stream [B]]——这一部分是不容置疑的。问题是语义应该是什么。 - Travis Brown
1
你说得对,我没有正确地阅读它。现在我也明白你的疑虑来自哪里了。 - soulcheck
1
你看过Scala的traverse源码吗?天啊,那是一些丑陋的Scala代码。特别是那个for :) - soulcheck
1
@soulcheck 我猜您所说的丑陋是指在某种程度上很有特色?我不太懂这个行话。他们现在还叫它“lingo”吗? - som-snytt
显示剩余3条评论
2个回答

1

我无法回答全部问题,但我会尝试回答其中一部分:

为什么这里没有表现出“最异步”的行为——即在返回之前不消耗集合,并且在移动到下一个之前不等待每个future完成的原因是什么?

如果您有依赖计算和有限数量的线程,则可能会遇到死锁。例如,您有两个future依赖于第三个(所有三个都在futures列表中),并且只有两个线程,您可能会遇到一种情况,其中前两个future阻止了所有两个线程,而第三个永远不会被执行。(当然,如果您的池大小为1,即依次执行一个计算,您也可以获得类似的情况)

为解决这个问题,您需要为每个future分配一个线程,没有任何限制。这适用于小的futures列表,但不适用于大的futures列表。因此,如果您将所有内容并行运行,您将会遇到这样的情况:小的例子在所有情况下都可以运行,而更大的例子则会出现死锁。(例如:开发人员测试正常运行,生产环境死锁)

对于列表和流操作,是否存在“正确”的行为方式?

我认为使用 futures 是不可能的。如果您知道更多依赖关系,或者确定计算不会阻塞,则可能有更具并发性的解决方案。但是,在我的看来,执行 futures 列表在设计时存在问题。最好的解决方案似乎是那些对于死锁的小例子就会失败的解决方案(例如一个接一个地执行 Future)。

使用 Scalaz futures 和列表:确保等待每个 future 完成。

我认为 Scalaz 在遍历时内部使用了 for 循环推导。使用 for 循环推导不能保证计算独立。所以我猜测 Scalaz 在这里通过 for 循环推导做了正确的事情:一个接一个地计算。在 futures 的情况下,只要您的操作系统具有无限线程,这将始终有效。
换句话说:您只看到了 for 循环推导必须工作的一种现象。
希望这有些意义。

1
如果我正确理解了问题,我认为这实际上归结为流与列表的语义问题。
遍历列表会按照文档中所述执行:
TraversableOnce[A] 转换为 Future[TraversableOnce[B]],使用提供的函数 A => Future[B]。这对于执行并行映射非常有用。例如,要并行地将一个函数应用于列表的所有项:
对于流而言,它取决于开发者如何希望它工作,因为它需要比编译器更多关于流的知识(流可能是无限的,但类型系统不知道)。如果我的流正在从文件中读取行,我希望先消耗它,因为逐行链接未来实际上不会并行化操作。在这种情况下,我会选择并行方法。
另一方面,如果我的流是一个生成顺序整数并寻找大于某个大数的第一个质数的无限列表,那么在一次扫描中消耗整个流是不可能的(需要链式Future方法,并且我们可能需要从流中运行批处理)。

与其试图找出一种处理这个问题的规范方式,我想知道是否有缺失的类型可以帮助更明确地表达不同情况。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接