Angular/RxJS 我应该什么时候取消订阅 `Subscription`?

1031

我应该在何时存储 Subscription 实例并在 ngOnDestroy 生命周期期间调用 unsubscribe(),何时可以简单地忽略它们?

将所有订阅保存会在组件代码中引入很多混乱。

HTTP 客户端指南 可以像这样忽略订阅:

getHeroes() {
  this.heroService.getHeroes()
                  .subscribe(
                     heroes => this.heroes = heroes,
                     error =>  this.errorMessage = <any>error);
}

同时,路由与导航指南指出:

最终我们会导航到其他地方。路由将从DOM中删除并销毁这个组件。在此之前,我们需要清理自己。具体来说,在Angular销毁组件之前,我们必须取消订阅。否则可能会导致内存泄漏。

我们在ngOnDestroy方法中取消订阅 Observable

private sub: any;

ngOnInit() {
  this.sub = this.route.params.subscribe(params => {
     let id = +params['id']; // (+) converts string 'id' to a number
     this.service.getHero(id).then(hero => this.hero = hero);
   });
}

ngOnDestroy() {
  this.sub.unsubscribe();
}

32
我想订阅http-requestsSubscription可以被忽略,因为它们只调用一次onNext,然后调用onComplete。相反,Router会重复地调用onNext,并且可能永远不会调用onComplete(不确定......)。对于从Event获得的Observable也是如此。所以我猜应该将它们取消订阅。 - Robert P
1
@gt6707a 流的完成(或未完成)独立于对其完成的任何观察。提供给订阅函数的回调(观察者)不确定是否分配资源。潜在地分配上游资源的是对subscribe的调用本身。 - seangwright
在你的 typescript 中,将明确取消订阅作为一种“肌肉记忆”。即使是 http 订阅也要这样做。例如:如果你的 Http.get() 在响应上完成。如果你的服务器 API 花费了 10 秒 来响应,并且你的组件在调用后的 5 秒 内被销毁,那么你的响应将在组件销毁之后的 5 秒 到达。这将触发一个上下文不正确的执行,这比 Angular 文档中指出的内存泄漏部分更糟糕。 - Avid Coder
1
@unk33k 能否分享文档的确切链接?抱歉,我找不到那一部分。 - Tyiliyra
29个回答

4

由于seangwright的解决方案(第三次编辑)似乎非常有用,我也发现将此功能打包到基本组件中很麻烦,并提示其他项目团队成员记得在ngOnDestroy上调用super()以激活此功能。

这个答案提供了一种摆脱super调用的方法,并使“componentDestroyed $”成为基本组件的核心。

class BaseClass {
    protected componentDestroyed$: Subject<void> = new Subject<void>();
    constructor() {

        /// wrap the ngOnDestroy to be an Observable. and set free from calling super() on ngOnDestroy.
        let _$ = this.ngOnDestroy;
        this.ngOnDestroy = () => {
            this.componentDestroyed$.next();
            this.componentDestroyed$.complete();
            _$();
        }
    }

    /// placeholder of ngOnDestroy. no need to do super() call of extended class.
    ngOnDestroy() {}
}

然后您可以自由地使用此功能,例如:
@Component({
    selector: 'my-thing',
    templateUrl: './my-thing.component.html'
})
export class MyThingComponent extends BaseClass implements OnInit, OnDestroy {
    constructor(
        private myThingService: MyThingService,
    ) { super(); }

    ngOnInit() {
        this.myThingService.getThings()
            .takeUntil(this.componentDestroyed$)
            .subscribe(things => console.log(things));
    }

    /// optional. not a requirement to implement OnDestroy
    ngOnDestroy() {
        console.log('everything works as intended with or without super call');
    }

}

4
官方的编辑#3答案(和变体)效果很好,但让我困扰的是可观察订阅周围的业务逻辑“混乱”。以下是另一种使用包装器的方法。引用:“警告:实验性代码”。文件subscribeAndGuard.ts用于创建一个新的Observable扩展来包装.subscribe(),并在其中包装ngOnDestroy()。使用方式与.subscribe()相同,除了额外的第一个参数引用组件。
import { Observable } from 'rxjs/Observable';
import { Subscription } from 'rxjs/Subscription';

const subscribeAndGuard = function(component, fnData, fnError = null, fnComplete = null) {

  // Define the subscription
  const sub: Subscription = this.subscribe(fnData, fnError, fnComplete);

  // Wrap component's onDestroy
  if (!component.ngOnDestroy) {
    throw new Error('To use subscribeAndGuard, the component must implement ngOnDestroy');
  }
  const saved_OnDestroy = component.ngOnDestroy;
  component.ngOnDestroy = () => {
    console.log('subscribeAndGuard.onDestroy');
    sub.unsubscribe();
    // Note: need to put original back in place
    // otherwise 'this' is undefined in component.ngOnDestroy
    component.ngOnDestroy = saved_OnDestroy;
    component.ngOnDestroy();

  };

  return sub;
};

// Create an Observable extension
Observable.prototype.subscribeAndGuard = subscribeAndGuard;

// Ref: https://www.typescriptlang.org/docs/handbook/declaration-merging.html
declare module 'rxjs/Observable' {
  interface Observable<T> {
    subscribeAndGuard: typeof subscribeAndGuard;
  }
}

这里有一个组件,它有两个订阅,一个带有包装器,一个不带。唯一的注意事项是它必须实现OnDestroy(如果需要,则为空),否则Angular不知道调用包装版本。

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import 'rxjs/Rx';
import './subscribeAndGuard';

@Component({
  selector: 'app-subscribing',
  template: '<h3>Subscribing component is active</h3>',
})
export class SubscribingComponent implements OnInit, OnDestroy {

  ngOnInit() {

    // This subscription will be terminated after onDestroy
    Observable.interval(1000)
      .subscribeAndGuard(this,
        (data) => { console.log('Guarded:', data); },
        (error) => { },
        (/*completed*/) => { }
      );

    // This subscription will continue after onDestroy
    Observable.interval(1000)
      .subscribe(
        (data) => { console.log('Unguarded:', data); },
        (error) => { },
        (/*completed*/) => { }
      );
  }

  ngOnDestroy() {
    console.log('SubscribingComponent.OnDestroy');
  }
}

这里提供一个演示用的plunker:链接

另外注意: 在“官方”解决方案中,可以通过在订阅之前使用takeWhile()代替takeUntil(),以及使用简单的布尔变量而不是另一个Observable来简化代码并在ngOnDestroy()中处理。

@Component({...})
export class SubscribingComponent implements OnInit, OnDestroy {

  iAmAlive = true;
  ngOnInit() {

    Observable.interval(1000)
      .takeWhile(() => { return this.iAmAlive; })
      .subscribe((data) => { console.log(data); });
  }

  ngOnDestroy() {
    this.iAmAlive = false;
  }
}

4

为了避免内存泄漏,出于性能方面的考虑,建议取消订阅你的可观察对象订阅。有不同的方法可以做到这一点,

顺便说一下,我阅读了大多数答案,没有找到有人谈论 async 管道,这是在 Angular 应用中使用推荐的Rxjs 模式,因为它提供了自动订阅和取消订阅的功能,当离开将被销毁的组件时:

请找到一个实现示例

app.component.ts:

import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

import { BookService } from './book.service';
import { Book } from './book';

@Component({
   selector: 'app-observable',
   templateUrl: './observable.component.html'
})
export class AppComponent implements OnInit { 
   books$: Observable<Book[]>
   constructor(private bookService: BookService) { }
   ngOnInit(): void {
        this.books$ = this.bookService.getBooksWithObservable();
   }
} 

app.compoennt.html:

<h3>AsyncPipe with Promise using NgFor</h3>
<ul>
  <li *ngFor="let book of books$ | async" >
    Id: {{book?.id}}, Name: {{book?.name}}
  </li>
</ul>

3

根据@seangwright的回答,我编写了一个处理组件中"无限"可观察对象订阅的抽象类:

import { OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs/Subscription';
import { Subject } from 'rxjs/Subject';
import { Observable } from 'rxjs/Observable';
import { PartialObserver } from 'rxjs/Observer';

export abstract class InfiniteSubscriberComponent implements OnDestroy {
  private onDestroySource: Subject<any> = new Subject();

  constructor() {}

  subscribe(observable: Observable<any>): Subscription;

  subscribe(
    observable: Observable<any>,
    observer: PartialObserver<any>
  ): Subscription;

  subscribe(
    observable: Observable<any>,
    next?: (value: any) => void,
    error?: (error: any) => void,
    complete?: () => void
  ): Subscription;

  subscribe(observable: Observable<any>, ...subscribeArgs): Subscription {
    return observable
      .takeUntil(this.onDestroySource)
      .subscribe(...subscribeArgs);
  }

  ngOnDestroy() {
    this.onDestroySource.next();
    this.onDestroySource.complete();
  }
}

要使用它,只需在您的Angular组件中扩展它并按以下方式调用subscribe()方法:

this.subscribe(someObservable, data => doSomething());

如常使用错误和完成回调、观察对象或完全没有回调。如果在子组件中实现了该方法,记得调用super.ngOnDestroy()

这里还有Ben Lesh提供的额外参考:RxJS: Don’t Unsubscribe


3
如果需要取消订阅,可以使用以下操作符来进行 observable 管道方法。
import { Observable, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { OnDestroy } from '@angular/core';

export const takeUntilDestroyed = (componentInstance: OnDestroy) => <T>(observable: Observable<T>) => {
  const subjectPropertyName = '__takeUntilDestroySubject__';
  const originalOnDestroy = componentInstance.ngOnDestroy;
  const componentSubject = componentInstance[subjectPropertyName] as Subject<any> || new Subject();

  componentInstance.ngOnDestroy = (...args) => {
    originalOnDestroy.apply(componentInstance, args);
    componentSubject.next(true);
    componentSubject.complete();
  };

  return observable.pipe(takeUntil<T>(componentSubject));
};

它可以像这样使用:

import { Component, OnDestroy, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Component({ template: '<div></div>' })
export class SomeComponent implements OnInit, OnDestroy {

  ngOnInit(): void {
    const observable = Observable.create(observer => {
      observer.next('Hello');
    });

    observable
      .pipe(takeUntilDestroyed(this))
      .subscribe(val => console.log(val));
  }

  ngOnDestroy(): void {
  }
}

这个操作符包装了组件的ngOnDestroy方法。

重要提示:这个操作符应该是可观察管道中的最后一个。


这个非常有效,但是升级到Angular 9似乎让它失效了。有人知道为什么吗? - ymerej

3

通常在组件被销毁时需要取消订阅,但是随着Angular的发展,越来越多的功能将由Angular自动处理。例如,在新版本的Angular4中,他们增加了关于路由取消订阅的部分:

您需要取消订阅吗?

如“路由和导航”页面中“ActivatedRoute: the one-stop-shop for route information”一节所述,路由器管理它提供的可观察对象并本地化订阅。当组件被销毁时,订阅将被清理,以防止内存泄漏,因此您不需要从路由paramMap Observable中取消订阅。

下面的示例是Angular创建组件并在销毁后销毁的好示例。请注意组件如何实现OnDestroy。如果您需要OnInit,则也可以在组件中实现它,例如:implements OnInit, OnDestroy

import { Component, Input, OnDestroy } from '@angular/core';  
import { MissionService } from './mission.service';
import { Subscription }   from 'rxjs/Subscription';

@Component({
  selector: 'my-astronaut',
  template: `
    <p>
      {{astronaut}}: <strong>{{mission}}</strong>
      <button
        (click)="confirm()"
        [disabled]="!announced || confirmed">
        Confirm
      </button>
    </p>
  `
})

export class AstronautComponent implements OnDestroy {
  @Input() astronaut: string;
  mission = '<no mission announced>';
  confirmed = false;
  announced = false;
  subscription: Subscription;

  constructor(private missionService: MissionService) {
    this.subscription = missionService.missionAnnounced$.subscribe(
      mission => {
        this.mission = mission;
        this.announced = true;
        this.confirmed = false;
    });
  }

  confirm() {
    this.confirmed = true;
    this.missionService.confirmMission(this.astronaut);
  }

  ngOnDestroy() {
    // prevent memory leak when component destroyed
    this.subscription.unsubscribe();
  }
}

4
我有点困惑,您(指Angular的最近文档/笔记)似乎在说Angular会处理这个问题,但随后又确认取消订阅是一个好的模式。谢谢。 - jamie

3

我使用一个名为"Unsubscriber"的类来处理订阅。

这里是Unsubscriber类的代码:

export class Unsubscriber implements OnDestroy {
  private subscriptions: Subscription[] = [];

  addSubscription(subscription: Subscription | Subscription[]) {
    if (Array.isArray(subscription)) {
      this.subscriptions.push(...subscription);
    } else {
      this.subscriptions.push(subscription);
    }
  }

  unsubscribe() {
    this.subscriptions
      .filter(subscription => subscription)
      .forEach(subscription => {
        subscription.unsubscribe();
      });
  }

  ngOnDestroy() {
    this.unsubscribe();
  }
}

"最初的回答" 翻译成英文是 "Original answer". 您可以在任何组件/服务/效果等中使用这个类。

例如:

class SampleComponent extends Unsubscriber {
    constructor () {
        super();
    }

    this.addSubscription(subscription);
}

3

我尝试了seangwright的解决方案(编辑3)。

但对于通过timer或interval创建的Observable无效。

然而,我使用了另一种方法使其工作:

import { Component, OnDestroy, OnInit } from '@angular/core';
import 'rxjs/add/operator/takeUntil';
import { Subject } from 'rxjs/Subject';
import { Subscription } from 'rxjs/Subscription';
import 'rxjs/Rx';

import { MyThingService } from '../my-thing.service';

@Component({
   selector: 'my-thing',
   templateUrl: './my-thing.component.html'
})
export class MyThingComponent implements OnDestroy, OnInit {
   private subscriptions: Array<Subscription> = [];

  constructor(
     private myThingService: MyThingService,
   ) { }

  ngOnInit() {
    const newSubs = this.myThingService.getThings()
        .subscribe(things => console.log(things));
    this.subscriptions.push(newSubs);
  }

  ngOnDestroy() {
    for (const subs of this.subscriptions) {
      subs.unsubscribe();
   }
 }
}

这是最好的、最可靠的和最简单的方法。 - Oleg

2

除上述情况外,另一个需要注意的情况是:

  • 当订阅的流中的新值不再需要或无关紧要时,请始终取消订阅,这将减少触发次数,并在某些情况下提高性能。例如,在组件中,订阅的数据/事件不再存在,或者需要订阅新的流(如刷新等)时,可以考虑取消订阅。

2

SubSink包,简单而一致的取消订阅解决方案

由Ward Bell创建的Subsink包是一个很好的解决方案,可以帮助您取消订阅。https://github.com/wardbell/subsink#readme

我在一个项目中使用它,并且我们有几个开发人员都在使用它。它有助于拥有一种在每种情况下都能正常工作的一致方式。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接