我正在尝试实现类似这样的效果:
我正在尝试使用Flow.fromGraph
创建此流程。
- 我可以使用
Zip[B, C]
完成join
,它需要两个流 - 我可以通过以下两种方式完成
split
:- 使用
Broadcast[A](2)
- 使用
UnZip[(A,A)]
,在其前面加上.map(a -> (a, a))
- 使用
map(f1)
和map(f2)
均为自定义流,我是从包含的库中获取的,因此我无法修改它们,请不要说.map(a => (f1(a), f2(a)))
这两种情况之间有什么区别,还是等效的?我发现唯一不同的是Broadcast
仅能在所有下游都取消时(eagerCancel = false
)才能取消,这是其默认行为,而UnZip
则与广播使用eagerCancel = true
相同。
此外,假设我们没有
f2
函数(因此没有映射操作),并且我们想在最后发出(b,a)
,那么是否应该将f2替换为身份流,或者可以完全跳过它?(如果后者,您会使用身份流吗?)val split = builder.add(BroadCast[A](2))
val join = builder.add(Zip[B, A])
val F1: Flow[A, B, NotUsed] = Flow[A].map(f1)
val I = Flow[A].map(identity)
split ~> F1 ~> join.in1
split ~> /* I ~> */ join.in0 // do i need the commented part?
也许这可以帮助内部缓冲区/反压力?