如何完成一个包含多个Observable的Observable

4

嗨,我有一个 (NgRx) Action,它触发了一个函数(应该返回一个Observable),该函数具有一个循环,每次循环我都需要发出(next)一个值,在循环中获取数据的函数返回一个 Observable,当所有 observable 完成时,我想完成具有循环的 observable:

以下是代码:

      public outerFunction(
        collections: someObject
      ): Observable<someOtherObject> {

        const outerObservable$ = new Observable<Observable<someOtherObject>>(
          (observer) => {

            const collKeys = Object.keys(collections);
            for (const id of collKeys) {
              if (collections[id]) {
                const innerObs$ = this.functionThatReturnsObs(
                  collections[id]
                )
                observer.next(innerObs$);
              }
              // observer.complete(); <-- If I complete here the stream is interrupted
            }
          }
        );
return outerObservable$.pipe(mergeAll());
    }
2个回答

4

实际上,我从RxJs文档中找到了答案。

由于在最后调用了mergeAll(),所以完全没有必要使用complete。

mergeAll订阅发出Observables的Observable,也称为高阶Observable。每次它观察到这些发出的内部Observables之一时,它都会订阅该内部Observable,并将所有来自内部Observable的值传递到输出Observable中。只有当所有内部Observables完成时,输出Observable才会完成。

最终,mergeAll()将完成我的observable。


3

你可以尝试这个。

return outerObservable$.pipe(mergeMap(res=>merge(res)))

如果那样不起作用,你可以像处理普通数组一样处理它。
   public outerFunction(
        collections: someObject
      ): Observable<someOtherObject> {
            const innerObs$=[]
            const collKeys = Object.keys(collections);
            for (const id of collKeys) 
              if (collections[id]) 
                innerObs$.push(this.functionThatReturnsObs(
                  collections[id]
                ))

             return merge(innerObs$);
    }

嗨,好的,那么我应该在哪里完成? - ALGDB
循环结束后 - Fan Cheung

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接