Scala流的生产和处理的多线程化

3
假设我有一个相当标准的生产者/消费者问题需要在Scala中编码,具有以下结构:
  1. 构建一个惰性生成元素的Stream或Iterator。
  2. 在Stream或Iterator上使用map或foreach处理这些元素并对其进行操作。
这似乎很好用,但它看起来是单线程的:当我们想要处理一个新元素时,我们要求生成它,在它被生成后,然后开始处理它。我真正想要的是一种机制,使生成可以在处理前一个元素时继续。是否有一种方法让Scala做到这一点?
我知道我可以使用BlockingQueue,但这对我来说似乎太命令式了。我希望有一种方法可以让一个Stream在另一个线程中不断生成元素。
一旦我们提前生成它们,它就不再是惰性评估了。但我也不想急于评估整个流。我希望有一个函数式范例的BlockingQueue模拟。

你应该研究一下响应式流。请参考 http://reactive-streams.org 。使用Scala和Akka实现的版本已经达到了1.0里程碑1状态。https://groups.google.com/d/msg/akka-user/PPleJEfI5sM/EpSGOK2Pah4J。每个流处理阶段都在自己的Actor上运行,因此您应该能够获得非常好的并发性。从长远来看,这甚至可以让您将流处理管道分布在多台机器上。 - Rüdiger Klaehn
有各种基于迭代器的方法,可以使双方都适当地异步。我认为scalaz-stream是当前在这个方向上的主要努力。 "连续"进程允许完全反应性的东西(尽管我没有直接使用过它们); 我所做的就是通过将完全纯净的“任务”与谨慎使用“unsafeStart”相结合来“伪造”它。 - lmm
我认为 scalaz-stream 是最符合你要求的。Akka streams 也提供了一个对 Akka actors 进行流处理的函数式封装。这两者之间最大的区别在于 Akka streams 支持反压(back pressure)。 - Soumya Simanta
1个回答

1
你可以按照以下方式将流中的项目映射到处理的未来中:
def process(x: Int): Int = // do something time consuming
val asyncProducer = Stream.from(0).map(x => future { process(x)})

现在这不会产生任何结果,因为Stream不会生成项目直到你尝试将它们实现,就像你建议的流程一样。因此,如果您想要触发下10个项目的处理,只需像这样实现它们即可:
val futureResults = asyncProducer.take(10).toList

这将启动10个并行进程(取决于您在范围内的ExecutionContext),并产生一个 List [Future [Int]]。为了能够接收所有这些工作项,您可以将future列表序列化为一个列表的future:

val futureResult = Future.sequence(futureResults)

现在,您可以将这个未来映射为一个结果列表,并将它们交给某个收件人,然后开始下一块处理。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接