如何在subscribe方法内取消RXJS订阅?

17

我有一些JavaScript:

this.mySubscription = someObservable.subscribe((obs: any) => {
   this.mySubscription.unsubscribe();
   this.mySubscription = undefined;
}

执行时,控制台记录错误ERROR TypeError: Cannot read property 'unsubscribe' of undefined

我想知道为什么我无法在subscribe Lambda函数内取消订阅。是否有正确的方法来这样做?我已经读了一些关于使用虚拟主题(dummy-subjects)并完成它们或使用takeUntil/takeWhile和其他管道操作的workArounds的文章。

有没有一种正确的方法或解决方法可以在订阅的subscribe函数内取消订阅?

我当前正在使用一个虚拟订阅,像这样:

mySubscription: BehaviorSubject<any> = new BehaviorSubject<any>(undefined);


// when I do the subscription:
dummySubscription: BehaviorSubject<any> = new BehaviourSubject<any>(this.mySubscription.getValue());
this.mySubscription = someObservable.subscribe((obs: any) => {
    // any work...
    dummySubscription.next(obs);
    dummySubscription.complete();
    dummySubscription = undefined;
}, error => {
    dummySubscription.error(error);
});

dummySubscription.subscribe((obs: any) => {
    // here the actual work to do when mySubscription  emits a value, before it should have been unsubscribed upon
}, err => {
    // if errors need be
});
3个回答

33

subscribe函数中不应尝试取消订阅。
你可以使用像taketakeWhiletakeUntil等操作符来取消订阅。

take

使用take(n)someObservable发出n次后取消订阅。

someObservable.pipe(
  take(1)
).subscribe(value => console.log(value));

takeWhile

使用takeWhile在发出的值不符合条件时取消订阅。

someObservable.pipe(
  takeWhile(value => valueIsSave(value))
).subscribe(value => console.log(value));

valueIsSave(value): boolean {
  // return true if the subscription should continue
  // return false if you want to unsubscribe on that value
}

takeUntil

使用 takeUntil(obs$) 当 observable obs$ 发出时取消订阅。

const terminate = new Subject();

someObservable.pipe(
  takeUntil(terminate)
).subscribe(value => console.log(value));

unsub() { 
  terminate.next() // trigger unsubscribe
}

取消订阅的条件是复杂的,取决于发出的值。 - MojioMS
1
@MojioMS 听起来 takeWhile 应该适合你。 - frido
在测试不同的rxjs管道操作符时,我因为意外多次订阅而感到困惑。然而,takeWhile做到了我想要的效果。 - MojioMS
takeWhile 是我正在寻找的,谢谢 Frido。 - Fernando Gabrieli

2

受这里的另一个回答和特别是这篇文章的启发,https://medium.com/@benlesh/rxjs-dont-unsubscribe-6753ed4fda87,我想提出以下示例使用 takeUntil()

...
    let stop$: Subject<any> = new Subject<any>(); // This is the one which will stop the observable ( unsubscribe a like mechanism )

    obs$
      .pipe(
        takeUntil(stop$)
      )
      .subscribe(res => {
        if ( res.something === true ) {
          // This next to lines will cause the subscribe to stop
          stop$.next();
          stop$.complete();
        }

      });
...

我想引用上述文章标题中的句子RxJS:不要取消订阅 :)


2
如果您将流设置为异步,则您正在执行的操作应该可以正常工作。例如,以下内容将无法正常工作:
const sub = from([1,2,3,4,5,6,7,8,9,10]).subscribe(val => {
  console.log(val);
  if(val > 5) sub.unsubscribe();
});

但这会起作用:

但这会起作用:

const sub2 = from([1,2,3,4,5,6,7,8,9,10]).pipe(
  delay(0)
).subscribe(val => {
  console.log(val);
  if(val > 5) sub2.unsubscribe();
});

因为JS事件循环是相当可预测的(代码块总是运行到完成),如果你的流的任何部分是异步的,那么你可以确定在调用lambda回调之前,你的订阅将被定义。
你应该这样做吗?
可能不应该。如果你的代码依赖于语言/编译器/解释器等的内部(否则隐藏的)机制,你就创建了脆弱的代码和/或难以维护的代码。下一个查看我的代码的开发人员会困惑为什么有一个delay(0) - 它看起来像什么也不应该做。
请注意,在subscribe()中,你的lambda可以访问它的闭包以及当前的流变量。 takeWhile()操作符也可以访问相同的闭包和相同的流变量。
from([1,2,3,4,5,6,7,8,9,10]).pipe(
  takeWhile(val => {
    // add custom logic
    return val <= 5;
  })
).subscribe(val => {
  console.log(val);
});

takeWhile() 可以做到 sub = subscribe(... sub.unsubscibe() ... ) 的所有功能,并且额外的好处是不需要你管理一个订阅对象,而且更易于阅读和维护。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接