创建一次性订阅

238

我需要创建一个订阅一个 Observable 的过程,当第一次调用它时就立即将其处置。

是否有类似以下代码:

observable.subscribeOnce(func);

我的使用情况是,在 Express 路由处理程序中创建一个订阅,并且每个请求会多次调用该订阅。

7个回答

424

如果您只想观察第一个值,那么请使用first()take(1)

observable.first().subscribe(func);

注:当它们的条件满足时,.take(1).first()都会自动取消订阅

RxJS 5.5+ 的更新

来自Coderer的评论。

import { first } from 'rxjs/operators'
    
observable
  .pipe(first())
  .subscribe(func);

这里是原因。


46
订阅会自动清理吗? - Berkeley Martinez
57
是的。当它们的条件达成时,"Take" 和 "First" 都会取消订阅。请注意,这里的取消订阅指的是观察者模式中的订阅机制。 - Brandon
22
文档为什么没有说明它会自动处理订阅的释放? - jzig
23
在 RxJS 中有一个通用规则:当可观察流结束时,订阅将被取消。这意味着任何“缩短”流(或将其转换为不同类型的流)的操作符,当它们的操作完成时都会从源流中取消订阅。我不确定这是否在文档中说明。 - Brandon
50
如果有人在2018年访问此内容,您实际上需要使用 rxjs/operators 中的 first 函数,使用 observable.pipe(first()).subscribe(func) 将其订阅。请注意,这里仅提供翻译,不包括任何解释或其他额外内容。 - Coderer
显示剩余9条评论

37
RxJS有一些我所见过的最好的文档。点击下面的链接将带您进入一个非常有用的表格,将使用案例映射到操作符。例如,在“我想要获取第一个值”这个使用案例下,有三个操作符:firstfirstOrDefaultsample
请注意,如果可观察序列在没有通知的情况下完成,那么first操作符会向订阅者发送错误通知,而firstOrDefault操作符会向订阅者提供默认值。 操作符使用案例查找

11

更新(2021年12月):

由于 RxJS 7 中的 toPromise() 函数已被弃用,因此新函数已宣布用于替代它。这些函数是 firstValueFromlastValueFrom

firstValueFrom 函数解析第一个发出的值并直接取消订阅资源。当 Observable 在不发出任何值的情况下完成时,它将拒绝并显示 EmptyError

另一方面,lastValueFrom 函数在某种程度上与 toPromise() 相同,因为它在 observable 完成时解析最后一个发出的值。但是,如果 observable 没有发出任何值,它将拒绝并显示 EmptyError。与 toPromise() 解析 undefined 不同,当没有值发出时。

要了解更多信息,请查看docs


新答案:

如果你只想调用一次Observable,这意味着你不会等待它的流。因此,在你的情况下,使用toPromise()而不是subscribe()就足够了,因为toPromise()不需要取消订阅。


非常有趣,这也意味着我们可以只使用 await 来等待 promise,从而使其变成一行代码。 - Louie Almeda
@LouieAlmeda,你能给我们举个一行代码的例子吗? - Ado Ren
嗨@AdoRen,我在这里为您提供了一份答案,希望能对您有所帮助。 - Louie Almeda
toPromise() 更像是 observable.pipe(last()) 而不是 first() - Mrk Sef
是的,“Promise”的答案应该是:await observable.pipe(take(1)).toPromise() - Brandon

3

补充一下@Brandon的回答,使用first()或类似方法对基于ObservableBehaviorSubject进行更新也是必要的。例如(未经测试):

Original Answer翻译成“最初的回答”。

var subject = new BehaviorSubject({1:'apple',2:'banana'});
var observable = subject.asObservable();

observable
  .pipe(
    first(), // <-- Ensures no stack overflow
    flatMap(function(obj) {
      obj[3] = 'pear';
      return of(obj);
    })
  )
  .subscribe(function(obj) {
    subject.next(obj);
  });

2

简洁方便的版本

在M Fuat NUROĞLU关于将可观察对象转换为Promise的惊人回答的基础上,这是一个非常方便的版本。

const value = await observable.toPromise();

console.log(value)

这样做的好处是,我们可以像正常变量一样使用该值,而无需引入另一个嵌套块!
当您需要从多个可观察对象中获取多个值时,这尤其方便。整洁简明。
const content = await contentObservable.toPromise();
const isAuthenticated = await isAuthenticatedObservable.toPromise();

if(isAuthenticated){
   service.foo(content)
}

当然,如果你采用这种方法,你必须将包含函数设为async。如果不想使包含函数成为async,也可以直接使用.then来处理promise。
我不确定这种方法是否存在权衡之处,请随时在评论中告诉我,以便我们知道。
附言:如果你喜欢这个答案,请不要忘记给M Fuat NUROĞLU的答案点赞 :)

4
请注意,只有当底层的Observable完成后,Promise才会被解析。因此,如果.toPromise()不起作用,请检查您的Observable是否已经完成。例如,可以使用"take(1)"来解决问题:const value = await observable.pipe(take(1)).toPromise()请参阅此评论:https://github.com/ReactiveX/rxjs/issues/2536#issuecomment-306041463 - zauni

2

我有类似的问题。

以下内容是来自不同状态更改器的后续调用。因为我不想要。

function foo() {
    // this was called many times which was not needed
    observable.subscribe(func);
    changeObservableState("new value");
}

我决定在订阅后尝试使用unsubscribe()

function foo() {
    // this was called ONE TIME
    observable.subscribe(func).unsubscribe();
    changeObservableState("new value");
}

subscribe(func).unsubscribe();就像subscribeOnce(func)

希望这也对您有所帮助。


1

observable.pipe(take(1)).subscribe() 使用 take 1,它将订阅一次,然后退出


2
请添加更多细节以扩展您的答案,例如工作代码或文档引用。 - Community

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接