如何在RxJS中等待一个Promise解决并跳过中间元素?

4

假设我有一个流(可观察对象),其中包含一些元素:

--a---b-c---d--

如果我有一个函数,它接受其中的一个元素并返回一个Promise,比如一个请求,然后我使用flatMap和这个函数,那么结果响应流将会是这样的(大写字母表示响应):

--a---b-c---d--
----A----B---CD

但这意味着对于 c 的请求将在 b 的请求结束之前开始。假设我想要避免执行对 c 的请求,并得到以下结果:
--a---b-c---d--
----A----B----D

我应该如何解决这个问题?


在以下代码中,我有一个流,在1、2、4和7秒后发出。我有一个需要2秒才能完成的request函数。我希望该函数仅在1、4和7时调用(不是在2时,因为1的请求还没有完成)。

const Rx = require('rx');

const logNext      = x => console.log(new Date(), 'Next:', x);
const logError     = x => console.log(new Date(), 'Error:', x);
const logCompleted = () => console.log(new Date(), 'Completed.');

Rx.Observable.fromArray([1, 2, 4, 7])
  .flatMap(x => Rx.Observable.of(x).delay(x * 1000))
  .flatMapFirst(request)
  .subscribe(logNext, logError, logCompleted);


function request(x) {
  console.log(`Starting request with ${x}`);
  return new Promise(resolve => {
    setTimeout(
      () => {
        console.log(`Finishing request with ${x}`);
        resolve(x)
      },
      2000
    );
  })
}

flatMapFirst可以生成正确的响应流,但我希望避免调用request(2)产生的副作用。

1个回答

3
如果您使用rxjs v4,可以尝试使用flatMapFirst。我无法确定在rxjs v5中是否存在该操作符。从文档中可以看出:
flatMapFirst操作符类似于上面描述的flatMap和concatMap方法,但是与发出由转换源Observable的项目生成的所有Observable发出的所有项目不同,flatMapFirst仅在第一个Observable完成之前传播它独占的第一个Observable,然后才开始订阅下一个Observable。在当前Observable完成之前到达的Observables将被删除并且不会传播。
代码可能如下所示:
source$.flatMapFirst(makeRequest)

这里将会发生的是,传入的b将导致创建makeRequest(b),对于 c 也是同样的情况。然而,makeRequest(c)将永远不会被订阅,这意味着它在实现中可能包含的影响将不会执行。
如果makeRequest本身(即函数而非可观察的makeRequest(x))实际上正在执行某些效果以创建其可观察输出,并且您想要防止这种情况,则可以使用defer
source$.flatMapFirst(x => Rx.Observable.defer(() => makeRequest(x)))

您还可以查看先前的回答,以了解更多使用 defer 的示例:


3
是的,如果你正在使用 RxJS 5,flatMapFirst 已经被重命名,改为 exhaustMap - cartant
抱歉,虽然这很有帮助,但我忘了提到我想完全避免调用该函数。这可行吗?我会更新问题以包括这一点。 - Franco Victorio
你所说的避免完全调用函数是什么意思?你需要发布一些代码,但原则上,如果你的可观察对象没有被订阅,那么就不会执行任何操作。虽然在某些情况下可以执行请求,只需将响应放入可观察对象中,此时可以使用defer来延迟请求在订阅时执行。如果您发布一些代码,我可以更清楚地解释这一点。 - user3743222

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接