RxJS - 使用Subject订阅停止Observable

3

我创建了一个Observable,它可以生成无限量的数据(例如计时器)。这些数据通过subject进行访问,因此多个观察者将接收相同的值。

如何停止Observable生成新值?(不修改Observable的实现)

// custom Observable, to visualize internal behavior
const timer$ = new rxjs.Observable(subscriber => {
  console.log("observable init");

  var counter = 0;
  const intervalId = setInterval(() => {
    ++counter;
    console.log("observable %s",counter);
    subscriber.next(counter);
  }, 1000);

  return () => {
    console.log("observable teardown");
    clearTimeout(intervalId);
  }
});

// subscribe through a subject
const subject$ = new rxjs.Subject();
timer$.subscribe(subject$);

const subscription = subject$.subscribe(value => console.log("observer %s", value));

// cancel subscription
setTimeout(() => {
  console.log("unsubscribe observer");
  subscription.unsubscribe();
  // TODO how to stop Observable generating new values?
}, 3000);

jsfiddle: https://jsfiddle.net/gy4tfd5w/

3个回答

3

幸运的是,在RxJS中有一种专门而优雅的方法来解决这个问题。

你的要求是

多个观察者[...]接收相同的值

这被称为多播可观察对象,有一些操作符用于从普通“冷”可观察对象创建一个多播可观察对象。

例如,您可以直接将源可观察对象传递到share操作符中,而不是直接创建Subject实例,它将为您创建Subjectshare的文档如下:

返回一个新的Observable,该Observable多播(共享)原始Observable。只要至少有一个Subscriber,此Observable就会被订阅并发出数据。当所有订阅者取消订阅时,它将从源Observable中取消订阅。

最后一句显示了sharesource$.subscribe(subject)之间的微妙差别。使用share时,会保留所谓的refCount,当没有剩余订阅者时,自动从其源取消订阅Subject

应用于您的代码,如下所示:

const timer$ = 
    new rxjs.Observable(subscriber => {// your unchanged implementation})
    .pipe(
        share()
    );

const subscription = timer$.subscribe(value => console.log("observer %s", value));

这是带有你的示例代码的完整版本:

https://jsfiddle.net/50q746ad/

顺便说一下,share 并不是唯一执行多播的运算符。这里有很棒的学习资源,可以更深入地了解这个主题。

0
这是我解决在流中停止一个疯狂的可观察对象的方法。
通过使用信号Subject和rxjs运算符:takeUntil
示例
const stopSignal$ = new Subject();

infinitelyGenerating$
  .pipe(takeUntil(stopSignal$))
  .subscribe(val => {
    if (val === 'bad') {
      stopSignal$.next()
    }
  })

-2

经过一番艰苦的研究,我为这个问题添加了自己的npm库。

通过无需添加任何额外的卷积变量和易于使用,改进了以前的答案。

enter image description here


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接