使用scalaz-stream,是否可以拆分/分叉然后重新连接流?
举个例子,假设我有以下函数:
使用scalaz-stream,例如在这个例子中,结果会如你所期望的是一个包含数字1到10的元组传递给sink。
然而,如果我们用需要IO操作的东西来替换streamOfNumbers,它实际上会执行两次IO操作。
使用Topic,我能够创建一个发布/订阅进程,正确地复制流中的元素,但它不会缓冲 - 它只是尽可能快地消费整个源,而不管sink的速度。
我可以将其包装在有界队列中,但最终结果感觉比它需要的复杂得多。
在scalaz-stream中是否有更简单的方法来拆分流而不重复源的IO操作呢?
举个例子,假设我有以下函数:
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers = streamOfNumbers.filter(isOdd).fold(0)(add)
zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )
使用scalaz-stream,例如在这个例子中,结果会如你所期望的是一个包含数字1到10的元组传递给sink。
然而,如果我们用需要IO操作的东西来替换streamOfNumbers,它实际上会执行两次IO操作。
使用Topic,我能够创建一个发布/订阅进程,正确地复制流中的元素,但它不会缓冲 - 它只是尽可能快地消费整个源,而不管sink的速度。
我可以将其包装在有界队列中,但最终结果感觉比它需要的复杂得多。
在scalaz-stream中是否有更简单的方法来拆分流而不重复源的IO操作呢?
def partition[A](p: A => Boolean): Process1[A, A \/ A] = process1.lift(a => if (p(a)) right(a) else left(a))
。需要添加吗? - Frank S. ThomaspartialTee
来处理这些用例。但是,你的提议对我来说似乎很好。也许我想看到类似于observeIfA,B(Sink[T,B])这样的东西。 - Pavel Chlupacek