RxJS: takeUntil() Angular组件的ngOnDestroy()

81
< p >< em >tl;dr:我的意思是我想将Angular的ngOnDestroy和Rxjs的takeUntil()操作符合并在一起。--这是否可行?

我有一个Angular组件,它打开了几个Rxjs订阅。当组件被销毁时,这些订阅需要被关闭。

这个问题的一个简单解决方案是:

class myComponent {

  private subscriptionA;
  private subscriptionB;
  private subscriptionC;

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    this.subscriptionA = this.serviceA.subscribe(...);
    this.subscriptionB = this.serviceB.subscribe(...);
    this.subscriptionC = this.serviceC.subscribe(...);
  }

  ngOnDestroy() {
    this.subscriptionA.unsubscribe();
    this.subscriptionB.unsubscribe();
    this.subscriptionC.unsubscribe();
  }

}

这个方法可以正常工作,但有点冗余。我特别不喜欢的是: - unsubscribe() 在其他地方,因此必须记住它们是相互关联的。 - 组件状态被订阅污染了。

我更喜欢使用takeUntil() 操作符或类似的方法,使代码看起来像这样:

class myComponent {

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    const destroy = Observable.fromEvent(???).first();
    this.subscriptionA = this.serviceA.subscribe(...).takeUntil(destroy);
    this.subscriptionB = this.serviceB.subscribe(...).takeUntil(destroy);
    this.subscriptionC = this.serviceC.subscribe(...).takeUntil(destroy);
  }

}

是否有一个销毁事件或类似的东西,可以让我使用 takeUntil() 或其他简化组件架构的方式?我意识到我可以在构造函数中自己创建一个事件,或者在 ngOnDestroy() 中触发一个事件,但最终这并没有使阅读更加简单。

9个回答

106

你可以使用一个 ReplaySubject 来实现这个目的:

编辑:从 RxJS 6.x 开始有所不同: 注意使用 pipe() 方法。

class myComponent {
  private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    this.serviceA
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
    this.serviceB
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
    this.serviceC
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
  }

  ngOnDestroy() {
    this.destroyed$.next(true);
    this.destroyed$.complete();
  }
}

这仅适用于 RxJS 5.x 及更早版本:

class myComponentOld {
  private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);

  constructor(private serviceA: ServiceA) {}

  ngOnInit() {
    this.serviceA
      .takeUntil(this.destroyed$)
      .subscribe(...);
  }

  ngOnDestroy() {
    this.destroyed$.next(true);
    this.destroyed$.complete();
  }
}

5
在某种程度上,这不是我想要的 - 我想避免在组件中创建额外的状态工件(destroyed$)并从ngOnDestroy触发它。但是经过更多的查看后,我意识到没有语法糖可以避免这个问题。尽管如此,这绝对已经是一个比存储所有订阅更好的解决方案了。谢谢! - marius
2
Angular团队一直在讨论如何使组件中的销毁事件对rxjs更容易访问,但据我所知目前还没有实现。 - olsn
7
我会考虑使用 new ReplaySubject(1)。这样你的组件将保持在销毁状态,并确保所有内容都已完成。除此之外,回答得很好 :) - Dorus
22
replaySubject 会帮助组件在 ngOnDestroy 已经被调用后,保持在一个已销毁的状态。任何延迟订阅都将立即从 replaySubject 中触发重放值并完成。请注意,这里的翻译只是为了让内容更加通俗易懂,并未改变原意。 - Dorus
2
@Isac,你上面的评论已经足够解释了,不是吗? - QBrute
显示剩余3条评论

26

使用npm包@w11k/ngx-componentdestroyed中的componentDestroyed()函数是使用takeUntil的最简单方法:

@Component({
  selector: 'foo',
  templateUrl: './foo.component.html'
})
export class FooComponent implements OnInit, OnDestroy {
  ngOnInit() {
    Observable.interval(1000)
      .takeUntil(componentDestroyed(this)) // <--- magic is here!
      .subscribe(console.log);
  }

  ngOnDestroy() {}
}

这是一个componentDestroyed()的版本,可以直接包含在你的代码中:

// Based on https://www.npmjs.com/package/ng2-rx-componentdestroyed
import { OnDestroy } from '@angular/core';
import { ReplaySubject } from 'rxjs/ReplaySubject';

export function componentDestroyed(component: OnDestroy) {
  const oldNgOnDestroy = component.ngOnDestroy;
  const destroyed$ = new ReplaySubject<void>(1);
  component.ngOnDestroy = () => {
    oldNgOnDestroy.apply(component);
    destroyed$.next(undefined);
    destroyed$.complete();
  };
  return destroyed$;
}

链接指向项目的弃用版本。新版本在 https://www.npmjs.com/package/@w11k/ngx-componentdestroyed 上。 - Moritz Ringler
3
这种方法的问题在于,你现在必须扩展某个基类。 - Felipe Issa
3
为什么要使用 ReplaySubject?- 我通常使用普通的 Subject。 - belzebubele
是的,你是对的。在大多数情况下,一个常规的“Subject”就足够了。 - Rene Hamburger

15

好的,这主要取决于您对关闭订阅的含义。基本上有两种方法:

  1. 使用完成链的运算符(例如 takeWhile())。
  2. 取消订阅源 Observable。

需要知道的是这两者并不相同。

例如使用 takeWhile() 时,您会使该运算符发送 complete 通知,该通知会传播到观察者。因此,如果您定义:

...
.subscribe(..., ..., () => doWhatever());

当你使用takeWhile()完成链式调用后,doWhatever()函数将会被调用。

例如,它可能会像这样:

const Observable = Rx.Observable;
const Subject = Rx.Subject;

let source = Observable.timer(0, 1000);
let subject = new Subject();

source.takeUntil(subject).subscribe(null, null, () => console.log('complete 1'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 2'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 3'));

setTimeout(() => {
  subject.next();
}, 3000);

3秒后将调用所有已完成的回调函数。

另一方面,当你取消订阅时,你是在说你对源 Observable 产生的项目不再感兴趣。然而,这并不意味着源必须完成。你只是不再关心了。

这意味着你可以收集所有 .subscribe(...) 调用中的 Subscription,然后一次性退订它们:

let subscriptions = new Rx.Subscription();
let source = Observable.timer(0, 1000);

subscriptions.add(source.subscribe(null, null, () => console.log('complete 1')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 2')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 3')));

setTimeout(() => {
  subscriptions.unsubscribe();
}, 3000);

现在经过3秒的延迟,因为我们已经取消订阅并且没有调用完成回调函数,所以不会有任何内容打印到控制台。

所以你想使用什么取决于你和你的使用情况。只要注意取消订阅与完成不同,尽管我猜在你的情况下,这并不是很重要。


或许值得注意的是,不应该在组件内部直接启动必须立即完成的流。任何关键操作都应通过一个服务来完成,以免被路由更改等导致销毁。 - olsn
到目前为止,我在这种情况下实际上还没有遇到过许多完成的流,因为大多数都是开放式的,并且组件只会在某个时刻停止侦听。但是,出于原则考虑,我肯定认为取消订阅可能是更好的模式,因为这是逻辑上应该发生的事情。我会考虑一下。谢谢! - marius
1
考虑在流上使用 takeUntil(Rx.Observable.timer(3000))。确实,使用 takeUntil 你会 complete,而使用 unsubscribe 你会 cancel - Dorus

6

请使用多态性与TakeUntil一起使用(2022年4月13日)

如果您在编写每个组件时都使用protected destroy$ = new Subject<void>();,那么您应该问自己,“为什么我没有遵循DRY(不要重复自己)原则呢?”

为了遵循DRY原则,请创建一个抽象的基本组件(抽象类不能直接实例化),该组件处理您的销毁信号。

@Component({ template: '' })
export abstract class BaseComponent extends Subscribable {
  // Don't let the outside world trigger this destroy signal.
  // It's only meant to be trigger by the component when destroyed! 
  private _destroy = new Subject<void>();
  public destroy$ = this._destroy as Observable<void>;
  /** Lifecycle hook called by angular framework when extended class dies. */
  ngOnDestroy(): void {
    this._destroy.next();
  }
}

编写一个方便的扩展函数以简化操作。

declare module 'rxjs/internal/Observable' {
  interface Observable<T> {
    dieWith(comp: BaseComponent): Observable<T>;
  }
}

Observable.prototype.dieWith = function<T>(comp: BaseComponent): Observable<T> {
    return this.pipe(takeUntil(comp.destroy$));
};

每当您需要处理订阅时,请扩展您的BaseComponent。

@Component({ ... })
export class myComponent extends BaseComponent {

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC
  ) {
    super();
  }

  ngOnInit() {
    this.serviceA.a$.dieWith(this).subscribe(...);
    this.serviceB.b$.dieWith(this).subscribe(...);
    this.serviceC.c$.dieWith(this).subscribe(...);
  }

}

你已经像专业人士一样在Angular组件中处理了订阅。

你的同事以后会感谢你!

祝编码愉快!


5
Angular 16提供了一个新的takeUntilDestroyed函数,可以像以下方式在构造函数中使用
import { Component } from "@angular/core";
import { takeUntilDestroyed } from "@angular/core/rxjs-interop";

@Component({
  selector: "my-component",
  templateUrl: "./my-component.html",
  styleUrls: ["./my-component.scss"]
})
export class MyComponent {

  constructor(private http: HttpClient) {
     this.http.get('/api')
       .pipe(takeUntilDestroyed())
       .subscribe();
  }
}

注意,如果您试图在构造函数之外执行相同操作,可能会出现以下错误:takeUntilDestroyed() 只能在注入上下文中使用,例如构造函数。要解决此问题,请更新为以下内容

import { Component, DestroyRef, OnInit, inject } from "@angular/core";
import { takeUntilDestroyed } from "@angular/core/rxjs-interop";

@Component({
  selector: "my-component",
  templateUrl: "./my-component.html",
  styleUrls: ["./my-component.scss"]
})
export class MyComponent implements OnInit {
  destroyedRef = inject(DestroyRef);

  ngOnInit(): void {
     this.http.get('/api')
       .pipe(takeUntilDestroyed(this.destroyedRef))
       .subscribe();
  }
}

4

如果您的组件直接与路由相关联,您可以通过利用Router事件takeUntil()来避免添加状态。这样,一旦您从组件中导航离开,它将自动为您清理其订阅。

import { Component, OnInit } from '@angular/core';
import { ActivatedRoute, Router } from '@angular/router';
import { MyService } from './my.service';
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/takeUntil';

@Component({
    ...
})
export class ExampleComopnent implements OnInit {

    constructor(private router: Router, private myService: MyService) {
    }

    ngOnInit() {
        this.myService.methodA()
            .takeUntil(this.router.events)
            .subscribe(dataA => {
                ...
            });

        this.myService.methodB()
            .takeUntil(this.router.events)
            .subscribe(dataB => {
                ...
            });
    }
}

注意: 这个简单的例子没有考虑到受保护路由或取消路由导航的情况。如果有可能触发路由器事件但路由导航被取消,您需要在路由器事件上进行过滤,以便在适当的时候触发它——例如,在路由守卫检查后或完成导航后。

this.myService.methodA()
    .takeUntil(this.router.events.filter(e => e instanceOf NavigationEnd))
    .subscribe(dataA => {
        ...
    });

4
创建一个基类。
import { Subject } from 'rxjs/Rx';
import { OnDestroy } from '@angular/core';

 export abstract class Base implements OnDestroy {

 protected componentDestroyed$: Subject<any>;

constructor() {
    this.componentDestroyed$ = new Subject<void>();

    const destroyFunc = this.ngOnDestroy;
    this.ngOnDestroy = () => {
        destroyFunc.bind(this)();
        this.componentDestroyed$.next();
        this.componentDestroyed$.complete();
    };
}
// placeholder of ngOnDestroy. no need to do super() call of extended class.
public ngOnDestroy() {
    // no-op
}

}

这个组件需要继承基类(Base class)。

export class Test extends Base{
}

当您订阅时。
service.takeUntil(this.componentDestroyed$
    .subscribe(...)

这是一个全局级别的修改,无论何时您想要订阅都需在整个项目中使用相同的方法。如果需要进行任何更改,您可以在基类中进行修改。

这个能用吗?我在箭头函数里面放了一个 console.log,就在 this.componentDestroyed$.next(); 的前一行,但它从未被调用。 - mtpultz

2

takeUntliDestroyed是Angular中的一个函数,详情请见文档链接。由于它目前仍处于开发者预览版,因此我建议暂时不要使用它。但毫无疑问,这很快就会成为正确的答案。 - jemand771

1
自Angular 16以来,有一个新运算符可以自动取消订阅和释放资源:takeUntilDestoryed。
示例:
import { takeUntilDestroyed } from 'angular/core/rxjs-interop';

@Component({...})
export class AppComponent implements OnInit {

  destroyRef = inject(DestroyRef);
  
  constructor(private http: HttpClient) {}

  public ngOnInit(): void {
    this.http.get('/api').pipe(takeUntilDestroyed(this.destroyRef)).subscribe();
  }
}

文档:https://angular.io/api/core/rxjs-interop/takeUntilDestroyed


你给出的例子会引发这个错误,因此被投了反对票:“takeUntilDestroyed()只能在注入上下文中使用,比如构造函数”。 - el-davo
好观点,我没注意到。我插入了DestroyRef,也可以通过将引用传递给运算符来使用它。 - Andrei Fara

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接