如何等待 Promise 解决并使用 RxJS 过滤器

4
在我的可观察源中,我收到事件,我希望过滤某个异步操作。例如:
s$.pipe(
    map((x) => x + 1),
    filter((x) => alreadyExist(x)))
  .subscribe(...)

alreadyExist是一个异步操作时,(检查值是否存在于持久存储中),它返回一个布尔值。

假设alreadyExist返回一个解析为布尔值的promise,我该如何等待返回结果?请注意,我需要保证值不会改变。使用async-await无效,并且订阅在过滤器返回之前执行。


现在不行,稍后我会告诉你,谢谢。 - itaied
2个回答

9

如果alreadyExist返回一个boolean,你可以使用mergeMap来等待它的结果。如果它是true,那么它将把它映射到原始的x值中。当它是false时,它将被filter过滤掉。

s$.pipe(
    map((x) => x + 1),
    mergeMap(x => from(alreadyExist(x)).pipe(
      filter(Boolean),
      mapTo(x),
    )),
  )
  .subscribe(...)

你是如何“管道化”这个Promise的? - itaied
如果alreadyExist返回一个Promise,你需要使用from来包装它。 - martin

7
你可以尝试我的建议 - 使用 switchMap 管道。
let isAlreadyExist = false; // "global" variable :)
s$.pipe(
    map((x) => x + 1),
    switchMap(async (x) => {
      isAlreadyExist = await alreadyExist(x); // make sure alreadyExist is a promise func
      return x; // "do" do nothing with your data
    }),
    filter((x) => isAlreadyExist)) // isAlreadyExist is value of alreadyExist func
  .subscribe(...)

或者您可以阅读有关 RxJS Observabledefer的内容。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接