考虑使用zip运算符将两个无限的Observables压缩在一起,其中一个以双倍频率发出项。
当前实现是无损的,即如果我让这些Observables发出一个小时,然后在它们的发射速率之间切换,第一个Observable最终会赶上另一个Observable。
这将导致内存爆炸,因为缓冲区变得越来越大。
如果第一个Observable发出项几个小时,而第二个Observable在最后发出一项,也会发生同样的情况。
我想要它产生的输出:
解释:
有损
当前实现是无损的,即如果我让这些Observables发出一个小时,然后在它们的发射速率之间切换,第一个Observable最终会赶上另一个Observable。
这将导致内存爆炸,因为缓冲区变得越来越大。
如果第一个Observable发出项几个小时,而第二个Observable在最后发出一项,也会发生同样的情况。
如何实现此操作符的有损行为? 我只想在从两个流中获得发射时进行发射,并且我不关心来自更快流的发射数量。
澄清:
- 我试图解决的主要问题是由于
zip
运算符的无损性质而导致的内存爆炸问题。 - 我希望在从两个流中获得发射时进行发射,即使两个流每次都发出相同的值
示例:
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
常规的zip
将产生以下输出:
[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
const Observable = Rx.Observable;
const Subject = Rx.Subject;
const s1 = new Subject();
const s2 = new Subject();
Observable.zip(s1,s2).subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
[1, 10]
[3, 20]
[5, 30]
解释:
有损
zip
操作符是带有缓冲区大小为 1
的 zip
。这意味着它只会保留从第一个发出的流中发出的第一项,并将丢失其余所有项(即在第一项和第二个流的第一次发射之间到达的项)。因此,在示例中发生的情况如下:stream1
发出 1
,有损 zip "记住" 它并忽略了 stream1
上的所有项目,直到 stream2
发出。 stream2
的第一个发射是 10
,因此 stream1
丢失了 2
。在相互发射后(有损 zip
的第一次发射),它重新开始:"记住" 3
,"丢失" 4
,发出 [3,20]
。然后重新开始:"记住" 5
,"丢失" 6
和 7
,发出 [5,30]
。然后重新开始:"记住" 40
,"丢失" 50
,60
,70
并等待 stream1
上的下一项。
示例 2:
Stream1: 1 2 3 ... 100000000000
Stream2: a
这种情况下,普通的zip
操作符会导致内存溢出。
我不希望发生这种情况。
摘要:
基本上,我期望有损的zip
操作符只记住流1
在前一次相互发射之后发出的第一个值,并在流2
赶上流1
时发出。然后重复。
combineLatest
。 - martin