Akka Stream:Unzip和Broadcast之间有什么区别?

3

我正在尝试实现类似这样的效果:

graph

我正在尝试使用Flow.fromGraph创建此流程。
  • 我可以使用Zip[B, C]完成join,它需要两个流
  • 我可以通过以下两种方式完成split
    • 使用Broadcast[A](2)
    • 使用UnZip[(A,A)],在其前面加上.map(a -> (a, a))

map(f1)map(f2)均为自定义流,我是从包含的库中获取的,因此我无法修改它们,请不要说.map(a => (f1(a), f2(a)))

这两种情况之间有什么区别,还是等效的?我发现唯一不同的是Broadcast仅能在所有下游都取消时(eagerCancel = false)才能取消,这是其默认行为,而UnZip则与广播使用eagerCancel = true相同。

此外,如果其中任何一个路径失败会发生什么情况?也就是说,如果对于特定的元素,f1抛出错误,会有什么影响?
此外,假设我们没有f2函数(因此没有映射操作),并且我们想在最后发出(b,a),那么是否应该将f2替换为身份流,或者可以完全跳过它?(如果后者,您会使用身份流吗?)
val split = builder.add(BroadCast[A](2))
val join = builder.add(Zip[B, A])
val F1: Flow[A, B, NotUsed] = Flow[A].map(f1)
val I = Flow[A].map(identity)

split ~> F1 ~> join.in1
split ~> /* I ~> */ join.in0 // do i need the commented part?

也许这可以帮助内部缓冲区/反压力?

也许 FlowWithContext 可以帮助这里,但此时此刻它仍在开发中,并且还不足以处理这个问题... 此外,关于 FlowWithContext 的文档和示例也不是很多。 - Codi
类似这样:https://gist.github.com/davideicardi/d3b383e5945a44252931582f83ecadc2 - Codi
1个回答

1
他们都是Fanout运算符;然而
从文档中可以看到Unzip:
接收一个由两个元素组成的流,将这两个元素拆分为两个不同的下游流。
而Broadcast:
将每个输入元素发射到n个输出中的每个输出。
因此我们可以得出结论,Unzip只是一个n = 2的Broadcast;但重要的是,如果元素是元组,Broadcast将只创建相同元组的n个输出。Unzip将为元素A和B分别创建2个输出。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接