Scala中的并行迭代器

16

是否可以使用Scala的并行集合,而不需要预先完全评估它,来并行化Iterator

这里我谈论的是在Iterator上并行化函数转换,即mapflatMap。我认为这需要预先评估Iterator的一些元素,然后在通过next消耗某些元素后计算更多元素。

我找到的所有资料都要求将迭代器转换为Iterable或者最好是Stream。然而当我在Stream上调用.par时,它就会被完全评估。

如果没有现成的解决方案,我也欢迎实现提议。实现应支持并行mapflatMap


答案很可能是否定的,但你能否多说一点关于你想要的内容?特别是,计算应该在什么时候开始运行——在创建迭代器之后,还是一旦调用某个强制评估的东西后? - Rex Kerr
@RexKerr 看起来像是一种设计选择;但让它在第一次请求时启动会使第一次请求变得有些特殊。我目前正在尝试实现类似的东西,我选择立即开始运行并存储接下来的 n 个结果。一旦一个被使用,我就计算一个替换。 - ziggystar
4个回答

6

我知道这是一个老问题,但 iterata 库中的 ParIterator 实现是否符合您的要求?

scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads

1
它解决了这个问题。不过,如果在一个块内的操作运行时间有很大的变化,它可能会更有效率一些,因为你会遇到很多阻塞。 - ziggystar
@ziggystar,如何使它更有效率? - ms-tg
ParIteratorIterator分成块。因此,如果您有小块(例如大小为2),其中一个元素需要1秒,另一个元素需要10秒,则并行化效果不佳。不同的实现可以在工作线程空闲时从迭代器中提供新元素。 - ziggystar
@ziggystar 在迭代中的 ParIterator 将这种考虑推迟到了标准库并行集合中。因此,在单个块内,Scala 并行集合是如何运作的呢? - ms-tg
1
我不确定你是否理解我的观点。即使在一个块内尽可能做到最好,分块也会创造障碍,无法进行并行化。这意味着您无法获得最大的CPU利用率。另一个缺点是更高的内存需求。为了并行化Scala需要强制执行它们,这导致整个块同时存在于内存中(假设迭代器创建对象)。理论上,您只需要在当前处理的元素中拥有内存即可。大块->好par /差mem和小块->差par /好mem。 - ziggystar
啊,好的,你反对在一般情况下在并行化之前进行分块。这没问题,但需要更复杂的解决方案。 - ms-tg

4

在标准库中,您最好不要使用并行集合,而是使用concurrent.Future.traverse

import concurrent._
import ExecutionContext.Implicits.global
Future.traverse(Iterator(1,2,3))(i => Future{ i*i })

虽然我认为这将立即启动整个过程。

2

来自ML,在并行遍历迭代器元素方面:

https://groups.google.com/d/msg/scala-user/q2NVdE6MAGE/KnutOq3iT3IJ

针对类似的原因,我放弃了使用Future.traverse。对于我的用例,保持N个任务在工作,我最终编写了代码来限制从作业队列中提供执行上下文的速度。

我的第一次尝试涉及阻塞进料线程,但这也可能会阻止想要在执行上下文中生成任务的任务。你知道的,阻塞是邪恶的。


你能否解释一下为什么要使用(NUM_CPUs + 1)^2作为阻塞队列的大小? - ziggystar
我也通过艰难的方式发现了以下两点:1. 我不擅长并发编程;2. flatMap 更加困难。 - ziggystar
@ziggystar 你所说的“你”是指ML上的“Juha”。我认为这不是一个魔法数字:足够大,以便消费者不会超过原始迭代器(可能会进行I/O,也许)加上映射函数(他说是CPU绑定的,但运行时间长或短?)。我看到未来提供给队列的数据将在没有调用“blocking”的情况下阻塞;也许+1是从“期望并行性”中剩下的。我的解决方案是管道的末端检查是否有更多的工作要做,即工作的最后一件事是检查是否有足够的工作正在进行中,如果没有,就喂养野兽。我同意这很难,简单是关键。 - som-snytt
这看起来工作得很好,而且 API 比 Future.traverse 简单多了。 我将它与 iterator.grouped 结合使用,以便将元素分成块,这样可以减少开销。 - samthebest

0

有点难以理解你的具体需求,但也许是这样:

val f = (x: Int) => x + 1
val s = (0 to 9).toStream map f splitAt(6) match { 
  case (left, right) => left.par; right 
}

这将并行地计算前6个元素上的f,并返回其余部分的流。


这似乎不是并行运行的 - 你需要把 map f 移到 par 后面吗? - DNA

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接