将Observable<String[]>转换为Observable<DataType[]>

3
我有一个返回给我一个Array<string>的api,根据原始id(一对多)。我需要在每个id上进行http请求,以从api中获取关联数据。我无法想出如何将Observable<string[]>映射到Observable<DataType[]>
如果有可能的话,我想保留原始observable并使用运算符来获得所需的结果。
由于observable中唯一的项是数组,因此在该情况下,管道化map运算符无效。
这里有一些示例代码,类似于我正在尝试实现的实现。
getIds = (originalId: string) => {
 return this.http.get<string[]>(url);
}

getDataFromIds = (originalId: string): Observable<DataType[]> => {
  const ids$ = this.getIds(originalId);
  // Make http calls for each of the items in the array.
  result = ids$.pipe();

  return result;
}

1
大致上,您想要将switchMap切换到forkJoin。然而,您尚未描述如果其中一个请求失败应该发生什么。 - Ingo Bürk
好观点。我没有考虑到HTTP失败的可能性。如果请求失败,它应该继续到下一个ID,不包括失败的响应。 - Buttars
2个回答

3

这是一个使用switchMap操作符的示例,通常情况下,您的内部observable会使用forkJoin操作符。

getIds = (originalId: string) => {
 return this.http.get<string[]>(url);
}

getDataFromIds = (originalId: string): Observable<DataType[]> => {
  const ids$ = this.getIds(originalId);
  // Make http calls for each of the items in the array.
  result = ids$.pipe(switchmap(ids => forkJoin(ids.map(id => this.getId(id))));
  // map the array of ids into an array of Observable<DataType>, forkjoin them and switch into it.

  return result;
}

这里假设getIds()函数会返回一组字符串id,并且你有一个getId()函数,该函数接受一个字符串id并返回一个可观察的DataType对象。

我们最终得到了相同的结果,只是我使用了combineAll()操作符。相比于forkJoin(),我的方法有什么劣势吗? - Buttars
1
在这种用例中最终结果是相同的。微妙的区别在于,forkJoin 只会发出一个值:即所有内部可观察对象完成后的最终结果数组。combineAll 可能会发出多个值:当每个内部可观察对象产生一个值时,以及任何内部可观察对象产生额外值时。由于您的内部可观察对象在产生1个值后就终止了,因此这种区别变得无关紧要。不过我会使用 forkJoin,因为它似乎更符合语义。 - Brandon
1
这里的结果相同。通常情况下,我们更喜欢使用forkJoin,因为combineLatest的预期是内部流具有多个发射,而forkJoin则声明您期望单个发射。 - bryan60

0
你可以试试这个:
ids$.pipe(
  switchMap(ids => //you can swap switchMap with any *Map operator: https://www.learnrxjs.io/operators/transformation/
    forkJoin(...ids.map(id => //you swap forkJoin with any comb. operator: https://www.learnrxjs.io/operators/combination/
      from(Promise.resolve({ id })).pipe(
        map(res => res.id),
        catchError(err => of(err)))))));

from, forkJoin 的导入应该来自于 rxjs,而其他所有内容都应该从 rxjs/operators 中导入。

catchError 将会捕获任何未处理的错误。

Demo: https://stackblitz.com/edit/rxjs-xmmhyj


1
这是一个不好的使用zip的情况,因为它是一个非终止的observable,通常在这种情况下,您需要使用一个完成的observable(forkJoin)来避免内存泄漏。使用zip可能需要不必要的订阅清理工作。 - bryan60
forkJoin有一个缺点,需要observable完成才能传播最终结果,这可能会阻塞流,您应该真正考虑使用不同操作符的权衡。每种情况都是不同的。 - Zdravko Tatarski
1
OP指定了这些是HTTP调用问题,这意味着它们完成并且forkJoin是此用例的正确运算符。 - bryan60

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接