RxJs:zip运算符的有损形式

8
考虑使用zip运算符将两个无限的Observables压缩在一起,其中一个以双倍频率发出项。
当前实现是无损的,即如果我让这些Observables发出一个小时,然后在它们的发射速率之间切换,第一个Observable最终会赶上另一个Observable。
这将导致内存爆炸,因为缓冲区变得越来越大。
如果第一个Observable发出项几个小时,而第二个Observable在最后发出一项,也会发生同样的情况。

如何实现此操作符的有损行为? 我只想在从两个流中获得发射时进行发射,并且我不关心来自更快流的发射数量。

澄清:

  • 我试图解决的主要问题是由于zip运算符的无损性质而导致的内存爆炸问题。
  • 我希望在从两个流中获得发射时进行发射,即使两个流每次都发出相同的值

示例:

Stream1: 1 2    3 4    5 6 7                
Stream2:     10     20       30 40 50 60 70

常规的zip将产生以下输出:

[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]

const Observable = Rx.Observable;
const Subject = Rx.Subject;


const s1 = new Subject();
const s2 = new Subject();

Observable.zip(s1,s2).subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); 
 
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

我想要它产生的输出:
[1, 10]
[3, 20]
[5, 30]

解释:
有损 zip 操作符是带有缓冲区大小为 1zip。这意味着它只会保留从第一个发出的流中发出的第一项,并将丢失其余所有项(即在第一项和第二个流的第一次发射之间到达的项)。因此,在示例中发生的情况如下:stream1 发出 1,有损 zip "记住" 它并忽略了 stream1 上的所有项目,直到 stream2 发出。 stream2 的第一个发射是 10,因此 stream1 丢失了 2。在相互发射后(有损 zip 的第一次发射),它重新开始:"记住" 3,"丢失" 4,发出 [3,20]。然后重新开始:"记住" 5,"丢失" 67,发出 [5,30]。然后重新开始:"记住" 40,"丢失" 506070 并等待 stream1 上的下一项。

示例 2:

Stream1: 1 2 3 ... 100000000000
Stream2:                        a

这种情况下,普通的zip操作符会导致内存溢出。
我不希望发生这种情况。

摘要:
基本上,我期望有损的zip操作符只记住流1前一次相互发射之后发出的第一个值,并在流2赶上流1时发出。然后重复。


看一下 combineLatest - martin
“combineLatest” 不是我想要的。它只要其中一个流发出信号就会发出。我需要的是每当这两个流都发出信号时都会发出信号。基本上,我希望使用缓冲区大小为1的“zip”运算符。 - JeB
5个回答

10
以下内容可以实现您所需的功能:
Observable.zip(s1.take(1), s2.take(1)).repeat()

在 RxJs 5.5+ 的管道语法中:
zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

解释:

  • repeat 操作符(目前的实现方式)在源 observable 完成后重新订阅,即在此特定情况下它会在每次相互发射时重新订阅 zip
  • zip 组合两个 observables 并等待它们都发出。 combineLatest 也可以,因为有 take(1),所以并没有什么区别。
  • take(1) 实际上处理了内存爆炸并定义了有损行为。

如果你想要在相互发射时从每个流中获取最后一个值而不是第一个值,请使用以下内容:

Observable.combineLatest(s1, s2).take(1).repeat()

RxJs 5.5+ 的管道语法中:

combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.combineLatest(s1,s2).take(1).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>


没错,你说得对,它不应该发出[7, 40]这一对。如果运算符在相互发射的第一个项目中发射了40的这一对,那么此时流中还没有这个值。我会修复这个例子。 - JeB
[7,40]与其他一些对有何不同之处? - Richard Matsen
有损的 zip 操作符是带有缓冲区大小为 1zip。这意味着它只会保留从第一个发出的流中发出的第一项,并将丢失所有其余项(即在第一项和第二个流的第一次发射之间到达的项)。因此,在示例中发生的情况如下:stream1 发出 1,有损的 zip "记住" 它并忽略 stream1 上的所有项目,直到 stream2 发出。stream2 的第一个发射是 10,所以 stream1 丢失了 2。在相互发射之后(有损的 zip 的第一次发射),它重新开始... - JeB
谢谢,那解释得很清楚。也许将其添加到问题中会很有用。 - Richard Matsen
1
RxJS 5.5+获取最后一个的语法应该不是take而是combineLatest(s1, s2).pipe(take(1), repeat())。从每个外部observable中获取一个将产生与使用zip获取一个相同的效果。因此,我认为在这两种情况下都可以使用combineLatest。请参见以下内容。 https://stackblitz.com/edit/rxjs-4jae8r - Trevor Karjanis
显示剩余2条评论

1
我认为以下内容应该始终从每个源Observable中获取最后一个值。
const source1 = Observable.interval(1000).publish();
const source2 = Observable.interval(300).publish();

source1.connect();
source2.connect();

Observable.defer(() => Observable.forkJoin(
        source1.takeUntil(source2.skipUntil(source1)),
        source2.takeUntil(source1.skipUntil(source2))
    ))
    .take(1)
    .repeat()
    .subscribe(console.log);

现场演示:http://jsbin.com/vawewew/11/edit?js,console

这将打印:

[ 0, 2 ]
[ 1, 5 ]
[ 2, 8 ]
[ 3, 12 ]
[ 4, 15 ]
[ 5, 18 ]

你可能需要将source1source2转换为热Observable(如果它们还不是的话)。
编辑:
核心部分是source1.takeUntil(source2.skipUntil(source1))。这会从source1中获取值,直到source2发出。但同时,它将忽略source1,直到source2至少发出一个值为止 :). forkJoin() Observable会等待两个源完成,同时记住它们各自的最后一次发射。
然后我们想要重复这个过程,所以我们使用take(1)来完成链式操作,使用.repeat()立即重新订阅。

1
如果你不想从开头开始发射,你需要将源转换为“热”可观测对象。 - martin
1
现在几乎清楚了。“然后我们想要重复这个过程,所以我们使用take(1)来完成链式操作,.repeat()来立即重新订阅” - 这也解释了defer运算符。但是为什么repeat会“立即重新订阅”呢?根据文档:“生成一个可观察的序列,使用指定的调度程序发送观察者消息,重复指定次数给定的元素”。它是如何重新订阅的? - JeB
1
我承认那是一个非常令人困惑的描述。它似乎重复先前的发射,但实际上并不是这样。事实上,在接收到“complete”通知后,它只是重新订阅其源Observable。您可以在此处查看https://github.com/ReactiveX/rxjs/blob/master/src/operators/repeat.ts#L52 - martin
1
@meltedspark 不是这样的。它总是只从每个源 Observable 中发出第一个项目。而使用 forkJoin 则会发出最后一个。 - martin
1
我认为它会,但我没有测试过。 - martin
显示剩余10条评论

1
这将生成序列 [ 0, 2 ] [ 1, 5 ] [ 2, 8 ] [ 3, 12 ] ...。
const interval1 = Rx.Observable.interval(1000)
const interval2 = Rx.Observable.interval(300)

const combined = Rx.Observable.combineLatest(interval1, interval2);
const fresh = combined.scan((acc, x) => { 
    return x[0] === acc[0] || x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

fresh.subscribe(console.log);

使用可能更少的运算符。不确定它的效率如何。
CodePen

对于更新#3

然后您需要为每个源项设置一个键。

// Simulated sources according to latest spec provided (update #3)
const source1 = Rx.Observable.from(['x','y','z'])
const source2 = Rx.Observable.from(['a','a','b','b','c'])

// Create keys for sources
let key1 = 0
let key2 = 0
const keyed1 = source1.map(x => [x, key1++])
const keyed2 = source2.map(x => [x, key2++])

const combined = Rx.Observable
  .combineLatest(keyed1, keyed2)
  .map(([keyed1, keyed2]) => [...keyed1, ...keyed2]) // to simplify scan below
combined.subscribe(console.log) // not the output, for illustration only
console.log('-------------------------------------')

const fresh = combined.scan((acc, x) => { 
    return x[1] === acc[1] || x[3] === acc[3] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

const dekeyed = fresh
  .map(keyed => { return [keyed[0], keyed[2]] })
dekeyed.subscribe(console.log); // required output

这会产生

["x", "a"]  
["y", "a"]  
["z", "b"]  

CodePen (打开控制台后请刷新CodePen页面,以获得更好的显示效果)


我认为这个解决方案比我的好。 - martin
但受到您的skipUntil()启发 - Richard Matsen
这解决了一个完全不同的问题。你的解决方案不会在每次两个流都发出值时都发出信号,而是在两个流发出与之前不同的值时才发出信号。在我的情况下,我不想使用distinctUntilChanged。即使它们相同,我也希望发出值。假设两个可观察对象每次都发出1。第一个发出了10次,第二个发出了5次。我想获得5个发射,而不“记住”5个冗余的发射。希望现在清楚了。 - JeB
在问题中添加了这个问题的澄清。 - JeB
是的,现在(在您添加了密钥之后)它将起作用。唯一的问题是,它解决了“forkJoin”运算符(取最新值)而不是“zip”(取第一个)的问题。与@martin的答案相比,这也变得相当复杂。我已经用具体示例更新了问题。 - JeB

0

你提到了缓冲区大小为1,不知道使用缓冲区大小为1来压缩两个ReplaySubjects是否可行?


我不能假设生产者的“Subject”是什么类型。当然,我可以使用“ReplaySubject”包装源“Observables”,但这并没有帮助。 “ReplaySubject”的目的完全不同:基本上它会向任何观察者发出源“Observable”发出的所有项目,而不管观察者何时订阅。 - JeB
基本上,它向任何观察者发出源Observable发出的所有项目 - 在缓冲区大小的限制下,不是吗?所以我认为压缩速率将是最慢源的速率。我可以为您进行测试,但恐怕现在已经是晚上了。 - Richard Matsen

0
我为了清晰度添加了另一个答案,因为它是在已接受的答案之后(但基于我的先前答案)。
请原谅我如果我理解错了,但我期望该解决方案能处理切换发射频率:
然后我在它们的发射速率之间进行切换,
提供的测试直到第一个流停止后才切换发射速率。
Stream1: 1 2    3 4    5 6 7                 
Stream2:     10     20    30 40 50 60 70

所以我尝试了另一个测试

Stream1: 1 2      3 4     5 6
Stream2:    10 20    30 40   50 60

该流的测试数据为:

s1.next(1); s1.next(2); s2.next(10); s2.next(20); s1.next(3); s1.next(4);
s2.next(30); s2.next(40); s1.next(5); s1.next(6);  s2.next(50); s2.next(60);

据我理解,被接受的答案未通过此测试。
它输出
[1, 10]
[3, 20]
[4, 30]
[5, 40]
[6, 50]

而我期望看到的是

[1, 10]
[3, 30]
[5, 50]

如果运算符是对称的(可交换的?)

完善我的先前回答

这个解决方案是基于基本运算符构建的,因此可能更容易理解。我无法确定它的效率,也许会在另一个迭代中进行测试。

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

const tagged1 = s1.map(x=>[x,1])
const tagged2 = s2.map(x=>[x,2])
const merged = tagged1.merge(tagged2)
const fresh = merged.scan((acc, x) => { 
    return x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only
const dekeyed = fresh.map(keyed => keyed[0])
const paired = dekeyed.pairwise()
let index = 0
const sequenced = paired.map(x=>[x,index++])
const alternates = sequenced.filter(x => x[1] % 2 === 0)
const deindexed = alternates.map(x=>x[0])

如果需要,也可以以更紧凑的形式呈现

let index = 0
const output = 
  s1.map(x=>[x,1]).merge(s2.map(x=>[x,2])) // key by stream id
  .scan((acc, x) => { 
    return x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged()       //fresh ones only
  .map(keyed => keyed[0])       // de-key
  .pairwise()                   // pair
  .map(x=>[x,index++])          // add a sequence no
  .filter(x => x[1] % 2 === 0)  // take even sequence
  .map(x=>x[0])                 // deindex

为了测试,CodePen(打开控制台后刷新CodePen页面,以获得更好的显示效果)


感谢您投入时间,但“在发射速率之间切换”只是为了强调当前无损行为而在问题中提到。这是我描述问题的部分,而不是问题本身。我认为我已经非常清楚地说明了我的问题,不是吗?示例和澄清不够清楚吗? - JeB
同时,如果您能解释一下为什么期望看到这个输出(就像我在问题示例中所做的那样),那将会很有帮助。因为目前来看,第一个输出比第二个更合乎逻辑。 - JeB
问题是损失压缩应每次两个流发出时都发出,而不记住中间的项目。在您的示例中,第一次两个流发出的时间是当第一个发出了1、丢失了2并且第二个发出了10。这很好。但是下一次它们同时发出的时间是第二个发出20和第一个发出3之后。在每次相互发射之后,它都开始等待下一次相互发射。或者我没有理解您的观点? - JeB
好的,这是你的问题定义,所以我不会太多地反对它 :)。 - Richard Matsen
我的主要观点是(针对我的新测试),如果stream1发出1并丢弃2,那么为了对称性,当stream2取得领先地位时,它也会这样做(发出它的第一个并且放弃所有后续操作,直到另一个流发出)。 - Richard Matsen
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接