如何防止 AsyncSubject 在最后一个观察者取消订阅时完成。

4
AsyncSubject 在最后一个观察者取消订阅主题时变为可观察对象。以下是引用:(链接)

完成之后就完成了。在取消订阅、完成或出错后,主题不能被重复使用。

这里是演示:

const ofObservable = Rx.Observable.of(1, 2, 3);
const subject = new Rx.AsyncSubject();

ofObservable.subscribe(subject);

subject.subscribe((v) => {
    console.log(v);
});

subject.unsubscribe((v) => {
    console.log(v);
});

// here I'll get the error "object unsubscribed"
subject.subscribe((v) => {
    console.log(v);
});

如何防止主题完成?
有一个“共享”运算符:
在RxJS 5中,运算符“share()”创建一个热的、refCounted的可观察对象,可以在失败时重试或在成功时重复。由于主题一旦出错、完成或取消订阅就无法重复使用,因此“share()”运算符将回收死亡主题以使结果可观测对象能够重新订阅。
这就是我要找的。但“共享”会创建一个主题,而我需要“AsyncSubject”。

嗯,谢谢,检查一下这个 Plunker,那里没有其他内容。 - Max Koretskyi
1
对我来说,它看起来有点像打字错误。将 subject.subscribe 调用的结果分配给一个变量(结果是一个订阅),并在订阅上调用 unsubscribe - 而不是主题。通过这个更改,这个 plunk 看起来做了我期望它做的事情。 - cartant
@cartant,谢谢,将其作为答案发布。我还想知道当我取消订阅主题时会发生什么?由于后者已经完成,因此应该已经从“of”可观察对象中取消订阅,那么当我从主题中取消订阅时会发生什么?另外,这是否意味着我引用的引语仅适用于普通主题? - Max Koretskyi
我真的不知道在主题上调用“unsubscribe”时发生了什么。这很有趣,所以我会研究一下,并稍后撰写答案。我想引用你问题中的引语是否适用取决于你对重复使用的定义。我的理解是订阅者可以在异步主题完成后订阅它,但异步主题不能订阅另一个源可观察对象。 - cartant
好的,很酷,期待你的答案。 - Max Koretskyi
显示剩余2条评论
1个回答

14
问题出在这行代码:
subject.unsubscribe((v) => {
    console.log(v);
});

Subject 实现了 ISubscription; 这意味着它有一个unsubscribe方法和一个closed属性。其unsubscribe的实现如下:

unsubscribe() {
  this.isStopped = true;
  this.closed = true;
  this.observers = null;
}

这有点残酷。基本上,它会切断与主题的所有订阅者的通信,而不会取消他们的订阅。同样地,它也不会将主题本身从任何可能被订阅的可观察对象中取消订阅。(它还标记了主题为关闭/停止状态,这就是您出错的原因。)
考虑到它实际上并没有执行任何取消订阅的操作,它应该如何使用并不清楚。this test的描述如下:
it('should disallow new subscriber once subject has been disposed', () => {

这表明它可能是RxJS 4的一种副作用,其中取消订阅被称为处理。

无论出现的原因是什么,我建议永远不要调用它。例如,看看这个片段:

const source = Rx.Observable
  .interval(200)
  .take(5)
  .do(value => console.log(`source: ${value}`));

const subject = new Rx.Subject();
source.subscribe(subject);

const subscription = subject
  .switchMap(() => Rx.Observable
    .interval(200)
    .take(5)
    .delay(500))
  .subscribe(value => console.log(`subscription: ${value}`));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

它订阅一个主题到源可观察对象,然后订阅从该主题组成的可观察对象。
如果在主题上调用了“unsubscribe”,则会出现一些问题:
- 源对主题的“next”方法进行调用时,主题对源的订阅未被取消并产生错误; - 对于从主题组成的可观察对象的订阅没有被取消,因此在进行“unsubscribe”调用后,“switchMap”中的“interval”可观察对象仍然会发出。
试试看:

const source = Rx.Observable
  .interval(200)
  .take(5)
  .do(value => console.log(`source: ${value}`));

const subject = new Rx.Subject();
source.subscribe(subject);

const subscription = subject
  .switchMap(() => Rx.Observable
    .interval(200)
    .take(5)
    .delay(500))
  .subscribe(value => console.log(`subscription: ${value}`));

setTimeout(() => {
  console.log("subject.unsubscribe()");
  subject.unsubscribe();
}, 700);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

这些行为似乎都不可取,因此应避免在 Subject 上调用 unsubscribe 方法。

相反,您的代码片段应使用由 subscribe 调用返回的 Subscription 进行取消订阅:

const subscription = subject.subscribe((v) => {
  console.log(v);
});
subscription.unsubscribe();

在撰写本答案后,我发现Ben Lesh以下评论与我的理论相符,即它与主题的处理有关:

如果你想让主题在完成其有用任务后,当你在其旁边调用next时大声且愤怒地出错,你可以直接在主题实例上调用unsubscribe


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接