假设我有两个可能是无限的流:
s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
我希望将流合并,然后使用稍微慢一些的异步操作来映射已合并的流(例如,在Bacon中使用fromPromise
和flatMapConcat
)。
我可以使用merge
将它们组合在一起:
me = a12b3.c45d6.7e...
接着进行映射
s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
me = a12b3.c45d6.7e...
mm = a..1..2..b..3..c..4..5..
正如您所看到的,贪心s2
流在长期内得到了优势。 这种行为是不希望的。
合并行为是不可取的,因为我想要一些反压机制来实现更多交替、"公平"、"轮询"的合并。以下是一些期望的行为示例:
s1 = a.....b..............c...
s2 = ..1.2.3..................
mm = a...1...b...2...3....c...
s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...
一种思考方式是,s1
和s2
将任务发送给只能处理一项任务的工作线程。使用 merge
和 flatMapConcat
,我将得到一个贪婪的任务管理器,但我想要更加公平的解决方案。
我希望找到一个简单而优雅的解决方案。如果可以轻松地为任意数量的流进行推广,那就太好了。
// roundRobinPromiseMap(streams: [Stream a], f: a -> Promise b): Stream b
var mm = roundRobinPromiseMap([s1, s2], slowAsyncFunc);
使用RxJS或其他Rx库的解决方案也可以。
澄清
不是zipAsArray
我不想要:
function roundRobinPromiseMap(streams, f) {
return Bacon.zipAsArray.apply(null, streams)
.flatMap(Bacon.fromArray)
.flatMapConcat(function (x) {
return Bacon.fromPromise(f(x));
});
}
比较示例弹珠图:
s1 = a.....b..............c.......
s2 = ..1.2.3......................
mm = a...1...b...2...3....c....... // wanted
zip = a...1...b...2........c...3... // zipAsArray based
是的,我会遇到缓冲问题
...但使用简单的不公平的方式也会遇到同样的问题:
function greedyPromiseMap(streams, f) {
Bacon.mergeAll(streams).flatMapConcat(function (x) {
return Bacon.fromPromise(f(x));
});
}
大理石图
s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...
merge = a...1...2...3...b....c...
bufferWithTimeOrCount
这样的东西。Bacon建议在您的情况下,从推式(push)转向拉式(pull),使用可迭代对象(iterable)而不是流(streams) - 相关的Bacon问题引用了Eric Meijer的话,他说你不应该使用可观察对象(observables)来处理快速产生数据源和慢速消费者的情况 - 我倾向于同意这个看法。我可以编写一个生成合并流的答案,但老实说,我不太理解您的交错逻辑 - 如果您能澄清一下,我可以概括一下要点。 - Benjamin Gruenbaumme
和mm
之间使用concatMap
。在这种情况下,是的,s2
会发挥作用。但是,您可以使用flatMap
来允许并发。我认为在Bacon中,这是flatMapWithConcurrencyLimit()
。另一种解决方案是分别在s1
上flatMap
承诺,并在s2
上flatMap
承诺,而不合并,因为合并在任何情况下都具有"或"语义。取决于您想要实现什么。 - André Staltz