将scalaz-stream进程分裂成两个子流

8
使用scalaz-stream,是否可以拆分/分叉然后重新连接流?
举个例子,假设我有以下函数:
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)

val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers  = streamOfNumbers.filter(isOdd).fold(0)(add)

zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )

使用scalaz-stream,例如在这个例子中,结果会如你所期望的是一个包含数字1到10的元组传递给sink。
然而,如果我们用需要IO操作的东西来替换streamOfNumbers,它实际上会执行两次IO操作。
使用Topic,我能够创建一个发布/订阅进程,正确地复制流中的元素,但它不会缓冲 - 它只是尽可能快地消费整个源,而不管sink的速度。
我可以将其包装在有界队列中,但最终结果感觉比它需要的复杂得多。
在scalaz-stream中是否有更简单的方法来拆分流而不重复源的IO操作呢?
3个回答

6

另外要澄清之前的回答涉及到“拆分”需求。针对您的具体问题,解决方案可能不需要拆分流:

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val oddOrEven: Process[Task,Int\/Int] = streamOfNumbers.map {
   case even if even % 2 == 0 => right(even)
   case odd => left(odd)
} 
val summed = oddOrEven.pipeW(sump1).pipeO(sump1)

val evenSink: Sink[Task,Int] = ???
val oddSink: Sink[Task,Int] = ???

summed
.drainW(evenSink)
.to(oddSink)

我在你的回答中看到了一个通用组合器:def partition[A](p: A => Boolean): Process1[A, A \/ A] = process1.lift(a => if (p(a)) right(a) else left(a))。需要添加吗? - Frank S. Thomas
Frank,我认为我们缺少一种更有效的partialTee来处理这些用例。但是,你的提议对我来说似乎很好。也许我想看到类似于observeIfA,B(Sink[T,B])这样的东西。 - Pavel Chlupacek
谢谢。我的实际情况要复杂得多,所以无法这样简化,但这给了我更多的线索来解决问题。 - James Davies

2

您可以尝试使用主题,并确保子进程在您推送到主题之前进行订阅。

但请注意,此解决方案没有任何限制,即如果您推送得太快,可能会遇到OOM错误。

def split[A](source:Process[Task,A]): Process[Task,(Process[Task,A], Proces[Task,A])]] = {
  val topic = async.topic[A]

  val sub1 = topic.subscribe
  val sub2 = topic.subscribe

  merge.mergeN(Process(emit(sub1->sub2),(source to topic.publish).drain))
}

对于动态数量的拆分,使用返回类型Process [Task,Array [Process [Task,A]]]是否有意义?此外,我应该如何修改它以使用Queue而不是Topic并为每个子进程提供唯一的流元素?感谢您的时间。 - Peter Becich

0

我也需要这个功能。我的情况比较棘手,无法以这种方式解决。

感谢Daniel Spiewak在this thread中的回复,我能够让以下内容正常工作。我通过添加onHalt来改进他的解决方案,以便在Process完成后退出我的应用程序。

def split[A](p: Process[Task, A], limit: Int = 10): Process[Task, (Process[Task, A], Process[Task, A])] = {
  val left = async.boundedQueue[A](limit)
  val right = async.boundedQueue[A](limit)

  val enqueue = p.observe(left.enqueue).observe(right.enqueue).drain.onHalt { cause =>
    Process.await(Task.gatherUnordered(Seq(left.close, right.close))){ _ => Halt(cause) }
  }
  val dequeue = Process((left.dequeue, right.dequeue))

  enqueue merge dequeue
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接