在 RxJS 中链接 Observables

85

我正在学习 RxJS 和 Angular 2。假设我有一个承诺链,其中包含多个异步函数调用,这些调用依赖于前一个调用的结果,它看起来像:

var promiseChain = new Promise((resolve, reject) => {
  setTimeout(() => {
    resolve(1);
  }, 1000);
}).then((result) => {
  console.log(result);

  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(result + 2);
    }, 1000);
  });
}).then((result) => {
  console.log(result);

  return new Promise((resolve, reject) => {
      setTimeout(() => {
      resolve(result + 3);
        }, 1000);
  });
});

promiseChain.then((finalResult) => {
  console.log(finalResult);
});

我尝试仅使用 RxJS 而不使用 Promises 实现相同功能,结果如下:

var observableChain = Observable.create((observer) => {
  setTimeout(() => {
    observer.next(1);
    observer.complete();
  }, 1000);
}).flatMap((result) => {
  console.log(result);

  return Observable.create((observer) => {
    setTimeout(() => {
      observer.next(result + 2);
      observer.complete()
    }, 1000);
  });
}).flatMap((result) => {
  console.log(result);

  return Observable.create((observer) => {
    setTimeout(() => {
      observer.next(result + 3);
      observer.complete()
    }, 1000);
  });
});

observableChain.subscribe((finalResult) => {
  console.log(finalResult);
});

它产生与 Promise 链相同的输出。我的问题是:

  1. 我做得对吗?是否有任何 RxJS 相关的改进可以应用到上面的代码中?

  2. 如何使这个可观察链重复执行?即使在末尾添加另一个订阅也只会产生额外的 6,但我期望它打印出 1、3 和 6。

    observableChain.subscribe((finalResult) => { console.log(finalResult); });

    observableChain.subscribe((finalResult) => { console.log(finalResult); });

    1 3 6 6


已经在这里放置了使用rxjs6链接承诺的工作示例:https://dev59.com/QlsW5IYBdhLWcg3w2aOK#55991374 - arcseldon
2个回答

62
关于Promise组合和Rxjs的比较,这是一个经常被问到的问题,您可以参考之前在SO上提出的一些问题,其中包括:

基本上,flatMap等价于Promise.then

对于您的第二个问题,您是想重放已经发出的值,还是想在新值到达时处理它们?如果是前者,请检查publishReplay操作符。如果是后者,标准订阅就足够了。但是,根据您的源(参见热和冷的观察对象:是否有“热”和“冷”操作符?以获得该概念的图解说明),您可能需要注意冷与热的二分法。


4
在 RXJs 6+ 版本中,flatMap 已被弃用,请使用 mergeMap 替代。 - mak15
.pipe(mergeMap(() => forkJoin({first: first$, second: second$,}))); 如果你想要启动超过一个。 - Petri Ryhänen

1

澄清的例子:

管道的顶部可以发出n个值(这回答了“如何使此可观察链重复执行”的问题),但随后链接的流只会发出一个值(因此模拟了承诺)。

// Emit three values into the top of this pipe
const topOfPipe = of<string>('chaining', 'some', 'observables');

// If any of the chained observables emit more than 1 value
// then don't use this unless you understand what is going to happen.
const firstObservablePipe = of(1); 
const secondObservablePipe = of(2);
const thirdObservablePipe = of(3);
const fourthObservablePipe = of(4);

const addToPreviousStream = (previous) => map(current => previous + current);
const first = (one) => firstObservablePipe.pipe(addToPreviousStream(one));
const second = (two) => secondObservablePipe.pipe(addToPreviousStream(two));
const third = (three) => thirdObservablePipe.pipe(addToPreviousStream(three));
const fourth = (four) => fourthObservablePipe.pipe(addToPreviousStream(four));

topOfPipe.pipe(
  mergeMap(first),
  mergeMap(second),
  mergeMap(third),
  mergeMap(fourth),
).subscribe(console.log);

// Output: chaining1234 some1234 observables1234

你也可以使用concatMap或switchMap。它们之间都有微妙的区别。请查看rxjs文档以了解详情。

mergeMap: https://www.learnrxjs.io/learn-rxjs/operators/transformation/mergemap

concatMap: https://www.learnrxjs.io/learn-rxjs/operators/transformation/concatmap

switchMap: https://www.learnrxjs.io/learn-rxjs/operators/transformation/switchmap


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接