将 Promise 转换为 Observable

414

我正在试图理解可观察对象。我喜欢可观察对象解决开发和可读性问题的方式。随着我的阅读,好处是巨大的。

在 HTTP 和集合上使用可观察对象似乎很直接。我该如何将这样的东西转换为可观察对象模式。

这是来自我的服务组件,用于提供身份验证。我希望它像 Angular2 中的其他 HTTP 服务一样工作——支持数据、错误和完成处理程序。

firebase.auth().createUserWithEmailAndPassword(email, password)
  .then(function(firebaseUser) {
    // do something to update your UI component
    // pass user object to UI component
  })
  .catch(function(error) {
    // Handle Errors here.
    var errorCode = error.code;
    var errorMessage = error.message;
    // ...
  });

非常感谢任何在这里提供的帮助。我唯一想到的替代方案是创建EventEmitter,但我认为这是服务部分中可怕的做法。


1
我认为这个问题不仅仅是关于Angular或Firebase的,它更关乎rxjs。 - Gherman
9个回答

610

如果您正在使用 RxJS 6.0.0:

import { from } from 'rxjs';
const observable = from(promise);

23
在使用版本为6.3.3的情况下,from方法返回一个Observable对象,但是它会将Promise作为值发送给订阅者。:( - Laxmikant Dange
1
这个回答适用于RXJS 6+。我尝试通过“直觉”从operators进行导入 - 结果是错误的。 - VSO
10
这个答案不正确,有时只能起到作用。Promise是急切的,而Observable是懒惰的(只有在订阅时才开始)。使用这个解决方案,即使没有 '.subscribe()',Promise已经开始了,因此它的行为不像Observable。请参考答案https://dev59.com/1VkS5IYBdhLWcg3w5KEi#69360357获取更好的解决方案。 - Llorenç Pujol Ferriol
3
请注意这个解决方案,因为它可能导致未处理的 Promise 拒绝,从而破坏您的代码!如果您正在使用 from 运算符,则必须在 from 运算符内部的 Promise 中处理 Promise 拒绝(使用 .catch()),否则 Promise 可能会执行并抛出错误,而其包装的 Observable 尚未被订阅,如果您的错误处理被管道传输到 Observable,则不会捕获错误!(我更喜欢使用 defer,并从 Observable 中央处理错误)。 - coderrr22

211

1 直接执行 / 转换

使用from直接将先前创建的Promise转换为Observable。

import { from } from 'rxjs';

// getPromise() is called once, the promise is passed to the Observable
const observable$ = from(getPromise());

observable$ 是一个热 Observable,它有效地将 Promise 的值重新发送给订阅者。

它是一个热 Observable,因为生产者(在这种情况下是 Promise)是在 Observable 外部创建的。多个订阅者将共享相同的 Promise。如果内部 Promise 已经解决,Observable 的新订阅者将立即获得其值。

2. 每次订阅时延迟执行

使用defer和一个 Promise 工厂函数作为输入来延迟创建和转换 Promise 到 Observable。

import { defer } from 'rxjs';

// getPromise() is called every time someone subscribes to the observable$
const observable$ = defer(() => getPromise());

observable$将是一个冷Observable

它是一种冷Observable,因为生产者(Promise)是在Observable内部创建的。每个订阅者都会通过调用给定的Promise工厂函数来创建一个新的Promise。

这允许您创建一个observable$,而无需立即创建和执行Promise,并且不与多个订阅者共享该Promise。 每个observable$的订阅者实际上都会调用from(promiseFactory()).subscribe(subscriber)。因此,每个订阅者都会创建并转换自己的新Promise为一个新的Observable,并将自己附加到这个新的Observable上。

3 Many Operators Accept Promises Directly

大多数RxJS操作符,如合并(例如mergeconcatforkJoincombineLatest ...)或转换Observable(例如switchMapmergeMapconcatMapcatchError ...)直接接受promises。如果您已经在使用其中之一,您不必先使用from来包装promise(但为了创建一个冷Observable,您仍然可能需要使用defer)。

// Execute two promises simultaneously
forkJoin(getPromise(1), getPromise(2)).pipe(
  switchMap(([v1, v2]) => v1.getPromise(v2)) // map to nested Promise
)

查看文档实现,以查看您正在使用的操作符是否接受ObservableInputSubscribableOrPromise

type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;
// Note the PromiseLike ----------------------------------------------------v
type SubscribableOrPromise<T> = Subscribable<T> | Subscribable<never> | PromiseLike<T> | InteropObservable<T>;

fromdefer在一个例子中的区别:https://stackblitz.com/edit/rxjs-6rb7vf

const getPromise = val => new Promise(resolve => {
  console.log('Promise created for', val);
  setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000);
});

// the execution of getPromise('FROM') starts here, when you create the promise inside from
const fromPromise$ = from(getPromise('FROM'));
const deferPromise$ = defer(() => getPromise('DEFER'));

fromPromise$.subscribe(console.log);
// the execution of getPromise('DEFER') starts here, when you subscribe to deferPromise$
deferPromise$.subscribe(console.log);

defer是许多应用程序依赖于可观察对象的冷启动和在订阅时触发数据获取的最常用运算符。尽管如此,对于某些用例,例如当您想要在某个初始化过程中创建一次Promise,然后通过可观察对象传播其值以供多次订阅,但不想为每个订阅者创建和执行Promise时,from仍然是一个可行的选项。


8
我认为差异非常重要,感谢你指出来。 - Starscream
6
感谢分享,许多操作员可以直接接受Promise!今天我学到了。 - BobbyTables
1
非常感谢您提供的详细说明!它不仅帮助我解决了问题,还帮助我更好地理解了相关概念。 - helhum

142

试一试:

import 'rxjs/add/observable/fromPromise';
import { Observable } from "rxjs/Observable";

const subscription = Observable.fromPromise(
    firebase.auth().createUserWithEmailAndPassword(email, password)
);
subscription.subscribe(firebaseUser => /* Do anything with data received */,
                       error => /* Handle error here */);

你可以在这里找到有关fromPromise操作符的完整参考。


69

将 Promise 转换为 Observable 的正确模式是使用 deferfrom 操作符:

import { defer, from } from 'rxjs';
    
const observable$ = defer(() => from(myPromise()));

我们为什么需要defer运算符?

Promise是急切的,这意味着当调用时它们会立即执行。这与observable的工作方式相反。Observable是惰性的,只有在调用.subscribe()时才会触发。这就是为什么我们总是需要把它包装成一个defer运算符的原因。 from运算符不会执行这项工作,因此defer始终是必需的


我无法想象在仅使用“from”时会失败的情况。你能分享一个这样行为不同的例子吗? - Tobias S.
这是一个例子:https://typescript-hhrcxj.stackblitz.io在这个例子中,我们应该在.subscribe上启动Promise,并且必须使用defer()运算符。如果没有defer运算符,则不需要订阅即可启动Promise。因此,在某些情况下,Promise已经在订阅之前解决了,这是要避免的一种模式。 - Llorenç Pujol Ferriol
@TobiasS。我现在有另一个例子。如果不失败,一个SignalR调用会返回一个"errorResult"事件,所以我不得不重复调用。rxJs运算符replay没有起作用,因为承诺已经执行。使用defer(()=>from),SignalR调用将被重播。 - Marcus Kaseder
@TobiasS。我现在有另一个例子。如果一个 SignalR 调用没有失败,它会返回一个 "errorResult" 事件,所以我不得不重新执行这个调用。由于 promise 已经执行过了,所以 rxJs 运算符 replay 并不起作用。通过使用 defer(()=>from),SignalR 调用可以重新执行。 - undefined

4
import { from } from 'rxjs';

from(firebase.auth().createUserWithEmailAndPassword(email, password))
.subscribe((user: any) => {
      console.log('test');
});

以下是将您的代码从 promise 转换为 observable 的一些答案的组合,形成了一个更短的版本。


3
虽然这段代码可能解决了问题,但是包括一份解释说明如何以及为什么能够解决问题将有助于提高您帖子的质量,并可能导致更多的赞。请记住,您正在回答未来读者的问题,而不仅仅是现在提问的人。请[编辑]您的答案以添加解释,并指出适用的限制和假设。 - janw

4

您可以添加一个包装器来使用 promise 功能并返回一个 Observable 给观察者。

  • 使用 defer() 操作符创建一个 延迟 的 Observable,它允许您在 Observer 订阅时才创建 Observable。
import { of, Observable, defer } from 'rxjs'; 
import { map } from 'rxjs/operators';


function getTodos$(): Observable<any> {
  return defer(()=>{
    return fetch('https://jsonplaceholder.typicode.com/todos/1')
      .then(response => response.json())
      .then(json => {
        return json;
      })
  });
}

getTodos$().
 subscribe(
   (next)=>{
     console.log('Data is:', next);
   }
)


3
您也可以使用 defer。主要区别在于 Promise 不会急切地解决或拒绝。

2
您也可以使用一个Subject并从promise触发其next()函数。下面是示例: 添加以下代码(我使用了服务):

class UserService {
  private createUserSubject: Subject < any > ;

  createUserWithEmailAndPassword() {
    if (this.createUserSubject) {
      return this.createUserSubject;
    } else {
      this.createUserSubject = new Subject < any > ();
      firebase.auth().createUserWithEmailAndPassword(email,
          password)
        .then(function(firebaseUser) {
          // do something to update your UI component
          // pass user object to UI component
          this.createUserSubject.next(firebaseUser);
        })
        .catch(function(error) {
          // Handle Errors here.
          var errorCode = error.code;
          var errorMessage = error.message;
          this.createUserSubject.error(error);
          // ...
        });
    }

  }
}

像下面这样从组件创建用户

class UserComponent {
  constructor(private userService: UserService) {
    this.userService.createUserWithEmailAndPassword().subscribe(user => console.log(user), error => console.log(error);
    }
  }


主题是低级机器。除非您正在扩展 rxjs,否则请不要使用主题。 - polkovnikov.ph
我只是提供一个解决方案。 - Shivang Gupta
你至少可以展示一下使用 new Observable(observer => { ... observer.next() ... }) 的方式来实现它。即使这是一个已知函数的重新实现,但它可以直接回答问题,对读者也没有伤害。 - polkovnikov.ph

-4

Rxjs 提供了 toPromise() 操作符, 就像代码示例所演示的:

@Injectable({
  providedIn: 'root'
})
export class InventoryService {
  constructor(private httpClient: HttpClient) {}

  getCategories(): Observable<Category[]> {
    const url = 'https://www.themealdb.com/api/json/v1/1/categories.php';

    return this.httpClient.get<CategoriesResponse>(url).pipe(
      map(response => response.categories)
    );
  }
}

在你的组件内部,你可以应用toPromise()操作符:

export class AppComponent {
  categories: any[];

  constructor(private inventoryService: InventoryService) {}

  public async loadCategories() {
    this.categories = await this.inventoryService
      .getCategories()
      .**toPromise()**

但是目前 Rxjs7+ 已经被弃用,建议使用 lastValueFrom() 操作符:

  public async loadCategories() {
    const categories$ = this.inventoryService.getCategories();
    this.categories = await **lastValueFrom**(categories$);
  }

我希望这个新版本的代码可以帮到你 :')


运算符正在询问相反的内容。 - Torsten Barthel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接