如何获取 RxJS Subject 或 Observable 的当前值?

328

我有一个 Angular 2 的服务:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

一切运作良好。但我还有另一个组件不需要订阅,它只需要在某个时间点获取isLoggedIn的当前值。我该怎么做?

13个回答

529

SubjectObservable 并没有当前值。当一个值被发射时,它会传递给订阅者并且该 Observable 完成了它的使命。

如果你想要一个当前值,可以使用专为此目的设计的 BehaviorSubject。它会保留最后发射的值,并立即将其发送给新的订阅者。

它还有一个方法 getValue() 用于获取当前值。


167
RxJS 5 的作者重要提示:使用getValue()是一个巨大的警示,表示你正在做一些错误的事情。它只是一个逃生口。通常情况下,你在使用 RxJS 时应该以声明式的方式进行操作。getValue() 是命令式的。如果你使用 getValue(),有99.9% 的可能性是你正在做一些错误或奇怪的事情。 - Ben Lesh
5
如果我有一个 BehaviorSubject<boolean>(false) 并且想要切换它,该怎么办? - bob
3
@GünterZöchbauer 我的意思是:当发出一个新的布尔值时,我需要知道当前值以切换它。在这种情况下,使用 getValue() 是否合理? - bob
13
@BenLesh 的 getValue() 方法非常有用,比如在执行瞬时操作时可以使用,例如 onClick->dispatchToServer(x.getValue()),但不要在可观察链中使用它。 - FlavorScape
4
@IngoBürk 我能想到的唯一解释是你不知道 foo$ 是一个 BehaviorSubject(即它被定义为 Observable,可能是热或冷的来自延迟源)。但是,既然你接着在 $foo 本身上使用了 .next,那么你的实现就依赖于它一个 BehaviorSubject,所以没有理由不直接使用 .value 来获取当前值。 - Simon_Weaver
显示剩余10条评论

229

订阅是唯一的从Observable/Subject获取值的方式!

如果你使用getValue(),那么你正在声明式范例中执行命令式操作。它存在作为一种应急措施,但99.9%的情况下不应该使用getValue() getValue()有一些有趣的功能:如果主题已取消订阅,它会抛出错误;如果主题已死亡,因为出现了错误,它将防止你获取值等。但是,它确实是为罕见情况而设立的应急措施。

有多种以“Rx-y”方式从Subject或Observable中获取最新值的方法:

  1. 使用BehaviorSubject:但要实际订阅它。当你第一次订阅BehaviorSubject时,它会同步发送它接收到的前一个值或初始化的值。
  2. 使用ReplaySubject(N):这将缓存N个值,并将它们重新传递给新的订阅者。
  3. A.withLatestFrom(B):使用此运算符在A发出时从ObservableB获取最新值。将会用一个数组[a, b]给你两个值。
  4. A.combineLatest(B):使用此运算符每次AB发出时获取最新的AB值。将用一个数组给你两个值。
  5. shareReplay():通过ReplaySubject使Observable获得多播能力,但允许您在发生错误时重试Observable。(基本上它会给你那种promise-y的缓存行为)。
  • publishReplay(), publishBehavior(initialValue), multicast(subject: BehaviorSubject | ReplaySubject)等操作符:这些操作符都利用了BehaviorSubjectReplaySubject。它们是相同的事物的不同变体,基本上通过将所有通知通过主题汇集来组播源可观察对象。您需要调用connect()使用主题订阅源。

  • 37
    为什么使用 getValue() 是一个危险信号?如果我只需要在用户点击按钮的时候获取一次可观察对象的当前值,那该怎么办?我应该订阅值并立即取消订阅吗? - Flame
    11
    有时候这样做没问题,但通常表明代码作者正在进行一些非常强制性的操作(通常具有副作用)而他们不应该这样做。您也可以使用click$.mergeMap(() => behaviorSubject.take(1))来解决您的问题。 - Ben Lesh
    3
    如果你只是想简单地分派当前值(比如在点击时将其发送到服务器),使用 getValue() 非常方便。但是在链式可观察操作符中不要使用它。 - FlavorScape
    7
    我有一个AuthenticationService,它使用BehaviorSubject来存储当前的登录状态(布尔值truefalse)。它向想要了解状态变化的订阅者公开了一个isLoggedIn$可观察对象。它还公开了一个get isLoggedIn()属性,通过在底层的BehaviorSubject上调用getValue()返回当前的登录状态 - 这被我的身份验证守卫用于检查当前状态。对我来说,这似乎是getValue()的合理用法...? - Dan King
    1
    @BenLesh执行click$.mergeMap(...需要将click$作为可观察对象。因此,您必须将模板变量放置在元素上,通过ViewChild将其传递到类中,使用fromEvent将其nativeElement转换为Observable...看起来像什么?没错,就是getElementByIdaddEventListener :D 在现代UI框架中,OMG...我认为使用BehaviorSubject.value是较小的邪恶 - Nabi Isakhanov
    显示剩余4条评论

    21

    1
    在我的 Angular4 应用中,以下更改帮助了我 - 我必须将订阅从组件构造函数移到 ngOnInit() 中(该组件跨路由共享),在这里留下以防有人遇到类似问题。 - Luca
    1
    我在使用Angular 5应用程序时遇到了一个问题,我正在使用服务从API获取值并在不同的组件中设置变量。我使用了Subjects / Observables,但在路由更改后它无法推送值。ReplaySubject是Subject的替代品,解决了所有问题。 - jafaircl
    2
    请确保使用ReplaySubject(1),否则新的订阅者将按顺序获取先前发出的每个值 - 这在运行时并不总是明显。 - Drenai
    据我了解,ReplaySubject(1)的行为与BehaviorSubject()相同。 - Kfir Erez
    4
    有所不同,最大的区别在于当ReplaySubject被订阅时,如果其next()函数尚未被调用,它不会立即发出默认值,而BehaviourSubject会。当BehaviourSubject在服务中被用作数据源可观察对象时,立即发出非常方便用于将默认值应用于视图。请注意,这里不是解释,只是翻译。 - Drenai

    10
    const observable = of('response')
    
    function hasValue(value: any) {
      return value !== null && value !== undefined;
    }
    
    function getValue<T>(observable: Observable<T>): Promise<T> {
      return observable
        .pipe(
          filter(hasValue),
          first()
        )
        .toPromise();
    }
    
    const result = await getValue(observable)
    // Do the logic with the result
    // .................
    // .................
    // .................
    

    你可以在这里查看如何实现它的完整文章。 https://www.imkrish.com/blog/development/simple-way-get-value-from-observable


    正是我在寻找的。谢谢! - rsmets
    很遗憾,链接已过期。我使用了 getValue(),这个解决方案解决了我的问题。谢谢! - Svmurvj
    请查阅firstValueFrom。这将使您无需自己编写函数,从而节省大量时间。 - koraljko

    3

    在子组件中,我遇到了同样的问题,最初它必须具有Subject的当前值,然后订阅Subject以便监听更改。我只需在服务中维护当前值,以便组件可以访问,例如:

    import {Storage} from './storage';
    import {Injectable} from 'angular2/core';
    import {Subject}    from 'rxjs/Subject';
    
    @Injectable()
    export class SessionStorage extends Storage {
    
      isLoggedIn: boolean;
    
      private _isLoggedInSource = new Subject<boolean>();
      isLoggedIn = this._isLoggedInSource.asObservable();
      constructor() {
        super('session');
        this.currIsLoggedIn = false;
      }
      setIsLoggedIn(value: boolean) {
        this.setItem('_isLoggedIn', value, () => {
          this._isLoggedInSource.next(value);
        });
        this.isLoggedIn = value;
      }
    }
    

    需要当前值的组件只需从服务中获取它,即:

    sessionStorage.isLoggedIn
    

    不确定这是否是正确的做法 :)


    这段话与IT技术无关。

    3
    如果您需要在组件视图中使用可观察对象的值,您可以直接使用async管道。 - Ingo Bürk

    3
    一个看起来类似的答案被踩了。但是我认为对于有限的情况下,我提出的建议是可以被证明合理的。
    虽然可观察对象没有当前值,但通常会有一个立即可用的值。例如,在Redux/Flux/Akita存储中,您可能会基于多个可观察对象请求数据,并且该值通常会立即可用。
    如果是这种情况,那么当您订阅时,该值将立即返回。
    所以让我们假设你调用了一个服务,在完成后,你想从存储中获取某个最新值,它有可能不会发出:
    你可以尝试这样做(并且应该尽可能地将事情“留在管道内”):
     serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
                         .subscribe(([ serviceCallResponse, customer] => {
    
                            // we have serviceCallResponse and customer 
                         });
    

    这样做的问题在于它会阻塞,直到次要可观测对象发出一个值,这可能永远不会发生。
    我最近发现自己需要仅在立即有一个值可用时评估可观测对象,并且更重要的是我需要能够检测它是否不可用。我最终采取了以下措施:
     serviceCallResponse$.pipe()
                         .subscribe(serviceCallResponse => {
    
                            // immediately try to subscribe to get the 'available' value
                            // note: immediately unsubscribe afterward to 'cancel' if needed
                            let customer = undefined;
    
                            // whatever the secondary observable is
                            const secondary$ = store$.select(x => x.customer);
    
                            // subscribe to it, and assign to closure scope
                            sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
                            sub.unsubscribe();
    
                            // if there's a delay or customer isn't available the value won't have been set before we get here
                            if (customer === undefined) 
                            {
                               // handle, or ignore as needed
                               return throwError('Customer was not immediately available');
                            }
                         });
    

    请注意,以上所有内容我都使用subscribe来获取值(正如@Ben所讨论的那样)。即使我有一个BehaviorSubject,也不使用.value属性。

    1
    顺便提一下,这是因为默认情况下,“schedule”使用当前线程。 - Simon_Weaver

    2
    虽然这听起来有些过度,但这只是另一种“可能”的解决方案,可以保持Observable类型并减少样板代码...
    您可以创建一个扩展getter来获取Observable的当前值。
    要做到这一点,您需要在一个global.d.ts类型声明文件中扩展Observable接口。然后在observable.extension.ts文件中实现扩展getter,并最终将类型声明和扩展文件包含在您的应用程序中。
    您可以参考这个StackOverflow Answer了解如何将扩展包含到您的Angular应用程序中。
    // global.d.ts
    declare module 'rxjs' {
      interface Observable<T> {
        /**
         * _Extension Method_ - Returns current value of an Observable.
         * Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
         */
        value: Observable<T>;
      }
    }
    
    // observable.extension.ts
    Object.defineProperty(Observable.prototype, 'value', {
      get <T>(this: Observable<T>): Observable<T> {
        return this.pipe(
          filter(value => value !== null && value !== undefined),
          first());
      },
    });
    
    // using the extension getter example
    this.myObservable$.value
      .subscribe(value => {
        // whatever code you need...
      });
    

    2
    有两种方法可以实现这一点。
    1. BehaviorSubject有一个getValue()方法,您可以在特定时间点获取该值。

    2. 您可以直接订阅BehaviorSubject,并将订阅的值传递给类成员、字段或属性。

    我不建议使用这两种方法。
    在第一种方法中,您可以随时使用方便的方法获取值,您可以将其称为该时间点的当前快照。问题是,在您的代码中可能会引入竞争条件,您可能会在许多不同的位置和不同的时间调用此方法,这很难调试。
    大多数开发人员使用第二种方法来订阅原始值,您可以跟踪订阅并确定何时取消订阅以避免进一步的内存泄漏,如果您真的想将其绑定到变量并且没有其他方式与其交互,那么您可以使用此方法。
    我建议再次查看您的用例,您在哪里使用它?例如,当您调用任何API时,您想确定用户是否已登录,您可以将其与其他可观察对象结合使用。
    const data$ = apiRequestCall$().pipe(
     // Latest snapshot from BehaviorSubject.
     withLatestFrom(isLoggedIn),
     // Allow call only if logged in.
     filter(([request, loggedIn]) => loggedIn)
     // Do something else..
    );
    

    有了这个,你可以直接使用它来处理UI,在angular中可以通过管道data$ | async来实现。


    2

    可以创建一个订阅,然后在获取第一个发出的项目后销毁它。在下面的示例中,pipe()是一个使用Observable作为输入并返回另一个Observable作为输出的函数,同时不修改第一个observable。

    样本使用Angular 8.1.0"rxjs": "6.5.3""rxjs-observable": "0.0.7"创建。

      ngOnInit() {
    
        ...
    
        // If loading with previously saved value
        if (this.controlValue) {
    
          // Take says once you have 1, then close the subscription
          this.selectList.pipe(take(1)).subscribe(x => {
            let opt = x.find(y => y.value === this.controlValue);
            this.updateValue(opt);
          });
    
        }
      }
    

    0
    你可以将最后发出的值单独存储在Observable之外,需要时再读取它。
    let lastValue: number;
    
    const subscription = new Service().start();
    subscription
        .subscribe((data) => {
            lastValue = data;
        }
    );
    

    4
    在响应式编程中,将一些数据存储在可观测对象之外并不是一种积极的做法。相反,你应该尽可能让更多的数据流进可观测流中。 - ganqqwerty

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接