如何在Rx Observable上使用`await`?

199
我希望您能在可观察对象上等待,例如:
const source = Rx.Observable.create(/* ... */)
//...
await source;

一种天真的尝试会导致await立即解析而不会阻塞执行。

编辑: 我完整预期用例的伪代码如下:

if (condition) {
  await observable;
}
// a bunch of other code

我明白我可以将其他代码移入另一个单独的函数并将其传递到订阅回调中,但我希望能够避免这样做。


你可以把剩余的代码(想要等待源代码)放进 .subscribe()方法中吗? - StriplingWarrior
7个回答

220
你需要将一个 Promise 传递给 `await`。将可观察的下一个事件转换为 Promise 并等待它。 ```

你需要将一个 Promise 传递给 await。将可观察的下一个事件转换为 Promise 并等待它。

```
if (condition) {
  await observable.first().toPromise();
}

编辑说明:此回答原本使用了.take(1),但已更改为使用.first(),以避免在流结束之前没有值传递时,Promise永远无法解决的问题。

从RxJS v8开始,toPromise将被移除。相反,可以使用await firstValueFrom(observable)替换上述内容。


19
如果完成时没有任何值,first() 将会返回拒绝状态,而 take(1) 将会返回一个挂起的 Promise。 - Estus Flask
9
@apricity @AgentME,实际上在这种情况下,您都不应该使用take(1)first()。由于您期望只发生一个事件,因此应该使用single(),如果有多个事件将抛出异常,而当没有事件时则不会抛出异常。 如果有多个事件,则可能是代码/数据模型等方面存在问题。如果不使用single(),则会任意选择返回的第一项,而不警告还有更多的选项。您必须在上游数据源的谓词中注意始终保持相同的顺序。 - ntziolis
5
别忘了导入:import 'rxjs/add/operator/first'; - Stephanie
14
既然toPromise()已经弃用,我们应该如何操作? - Jus10
5
@Jus10,我找不到任何表明 toPromise() 已被弃用的内容。你在哪里看到的? - NickL
显示剩余9条评论

69

请使用新的firstValueFrom()或者lastValueFrom()代替toPromise()。 如此处所指出的,从RxJS 7开始已经将其废弃,并将在RxJS 8中删除。

import { firstValueFrom} from 'rxjs';
import { lastValueFrom } from 'rxjs';

this.myProp = await firstValueFrom(myObservable$);
this.myProp = await lastValueFrom(myObservable$);

这在 RxJS 7+ 中可用。

参见:https://indepth.dev/rxjs-heads-up-topromise-is-being-deprecated/


30

很可能必须是这样

await observable.first().toPromise();

如之前所注释的那样,当空的可观察流被完成时,take(1)first() 操作符之间存在重大差别。

Observable.empty().first().toPromise() 将导致被拒绝,并显示 EmptyError,可以相应地处理,因为确实没有值。

Observable.empty().take(1).toPromise() 将返回一个解析值为 undefined 的结果。


实际上,take(1)不会产生未决的Promise。它将返回一个已解析为undefined的Promise。 - Johan t Hart
谢谢您的注意,没错。我不确定为什么帖子会有所不同,可能是某个时候发生了行为变化。 - Estus Flask

19

编辑:

.toPromise() 在 RxJS 7 中已弃用(来源:https://rxjs.dev/deprecations/to-promise)。

新答案:

作为 toPromise() 方法的替代方案,你应该使用内置的静态转换函数 firstValueFrom 或 lastValueFrom 中的一个。

例如:

import { interval, lastValueFrom } from 'rxjs';
import { take } from 'rxjs/operators';
 
async function execute() {
  const source$ = interval(2000).pipe(take(10));
  const finalNumber = await lastValueFrom(source$);
  console.log(`The final number is ${finalNumber}`);
}
 
execute();
 
// Expected output:
// "The final number is 9"
如果你的 toPromise 被弃用了,你可以使用 .pipe(take(1)).toPromise。但是如你所见在 这里,它并没有被弃用。因此请使用 toPromise (RxJs 6)。
//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = sample('First Example')
  .toPromise()
  //output: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

async/await示例:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = await sample('First Example').toPromise()
// output: 'First Example'
console.log('From Promise:', result);

在此处阅读更多


toPromise已被弃用,并且在RxJS 8中已被删除。您链接中的文档已过时。 - nmfzone
Rx 是什么意思? - Jmainol

12
你需要等待一个 Promise,因此你需要使用 toPromise()。有关 toPromise() 的更多详细信息,请参见此处

5

toPromise()在RxJs 7及以后版本中已被弃用,不推荐使用。你可以使用RxJs 7中的两个新操作符lastValueFrom()firstValueFrom()。更多详情请参见此处

const result = await lastValueFrom(myObservable$);

此处提供Beta版本的实现:


0

我正在使用RxJS V 6.4.0,因此我应该在V 7.x.x中使用已弃用的toPromise()。受其他答案的启发,这是我对toPromise()所做的事情。

import { first, ... } from 'rxjs/operators';

...

if (condition) {
  await observable$.pipe(first()).toPromise();
}

...


请注意我在pipe()中使用了last()。因为在我的observable.first()不像macil所说的那样工作。
希望这能帮助其他像我一样使用RxJS V 6.x.x的人 :)
谢谢。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接