在subscribe中调用subscribe好吗?

80
我需要从三个不同的API传递三个数据到一个函数中:

我需要从三个不同的API传递三个数据到一个函数中:

this.service.service1().subscribe( res1 => {
  this.service.service1().subscribe( res2 => {
    this.service.service1().subscribe( res3 => {
      this.funcA(res1, res2, res3);
  });
  });
});

在一个subscribe里再次订阅是个好习惯吗?


2
没有什么“伤害”,但这被认为是 rxjs 中的反模式。如果这三个调用是独立的,那么这样做会使程序变得不必要的缓慢。 - Ingo Bürk
@PankajParkar 不,它们不是相互依赖的。如果我进行独立调用,我将无法将数据传递给funcA。 - Yashwanth Gurrapu
7个回答

101

正确的方法是以某种方式组合各个可观察对象,然后订阅整个流 - 如何组合取决于您的确切要求。

如果您可以并行执行它们:

forkJoin(
  this.service.service1(), this.service.service2(), this.service.service3()
).subscribe((res) => {
  this.funcA(res[0], res[1], res[2]);
});

如果每个操作的结果都依赖于上一个操作的结果,您可以使用mergeMap(以前称为flatMap)或switchMap

this.service.service1().pipe(
  mergeMap((res1) => this.service.service2(res1)),
  mergeMap((res2) => this.service.service3(res2))
).subscribe((res3) => {
  // Do something with res3.
});

...等等。有许多运算符可以组合可观察对象以涵盖许多不同的场景。


1
forkJoin 会确保执行顺序吗? - Pankaj Parkar
3
它会按编码顺序返回结果,但我不确定它是否保证按指定顺序开始。然而,由于它们是HTTP请求,即使以特定顺序启动它们,除非你按顺序形成它们,否则无法保证服务器接收和处理它们的顺序 - 因此,如果这很重要,你需要使用类似flatMap链的东西,在开始下一个之前等待一个请求完成。 - Mark Hughes
2
假设这些是HTTP请求,你会如何处理每种情况下的HTTP错误,特别是在flatMap示例中?另外,如果我理解正确,如果我们需要确保完成的顺序,我们需要使用concatMap而不是flatMap:https://www.learnrxjs.io/operators/transformation/concatmap.html - AsGoodAsItGets
4
使用concatMap和flatMap取决于您的用例,@AsGoodAsItGets - 在上面的示例中,两者都可以确保完成顺序。关于错误处理,在上述flatMap示例中,如果(例如)service2出错,则不会调用service3,并且它将转到传递给subscribe的错误处理函数(如果您传递了一个)。如果您想更早地捕获它们,可以在管道中使用catchError... - Mark Hughes
2
如果您想单独处理每个服务的错误怎么办?例如,一个用于处理服务1故障的错误处理程序,另一个用于处理服务2故障的错误处理程序等等。我无法想象实际情况中会有超过2个订阅的情况,但只是想知道它将如何构建。 - AsGoodAsItGets
显示剩余2条评论

15

虽然上述所有方法都有助于提供解决此特定问题的解决方案,但它们似乎都没有解决明显的根本问题,即:

在subscribe内部调用subscribe是否是一种好的方式?

简而言之

,在subscribe内部调用subscribe并不是一个好的方式。


为什么?
因为这不是函数式编程的工作方式。你没有思考函数式,而是过程式。这并不一定是问题,但使用rxjs(和其他响应式编程扩展)的整个目的是编写功能代码。
我不会详细介绍什么是函数式编程,但基本上函数式编程的重点是将数据视为流。由函数操作的流,并由订阅者消耗。一旦在另一个订阅中添加了一个订阅,你就在消费者内部操作数据(而不是在流内部)。因此,你的功能流现在已经被破坏了。这会阻止其他消费者在你的代码中进一步利用该流。所以你把你的功能流变成了一个过程。

enter image description here

图片来源,在此处可以找到更多关于纯函数式编程的信息


7

你可以使用forkJoinObservables组合成一个单一值的Observable

forkJoin(
  this.service.service1(),
  this.service.service2(),
  this.service.service3()
).pipe(
  map(([res1, res2, res3 ]) => {
    this.funcA(res1, res2, res3);
  })

2
我认为这两者并不等价,因为你在此处并行调用,而作者是按顺序执行。 - Yura

2

1

看起来有些奇怪,我会选择这种方式,因为它看起来更整洁:

async myFunction () {
//...
const res1 = await this.service.service1().toPromise();
const res2 = await this.service.service2().toPromise();
const res3 = await this.service.service3().toPromise();
this.funcA(res1, res2, res3);
//...

}

编辑

或者并行地执行它

async myFunction () {

//...
let res1;
let res2;
let res3;
[res1,res2,res3] = await Promise.all([this.service.service1().toPromise(),
                                      this.service.service2().toPromise(),
                                      this.service.service3().toPromise()]);
this.funcA(res1, res2, res3);
//...

}

1
这里存在同样的问题,你是顺序执行了这三个调用而不是并行执行,导致速度变慢(假设它们可以并行运行)。 - Ingo Bürk
4
RxJS 的最佳实践答案不应该是“将所有内容都转换为 Promise”,虽然在某些场景下(如 HTTP 调用)这种方法可能有效,但 Observables 的优美之处在于可以工作于流而不是像 Promise 那样单次响应,如果将它们转换为 Promise,就失去了使用 Observables 的意义。 - Mark Hughes

0

你可以使用zip RxJs操作符,在这种情况下,你只需要使用一个subscribe。

然后你可以在那个subscribe中调用你的函数,因为所有的结果都是可用的。

Observable.zip(
  this.service.service1(),
  this.service.service1(),
  this.service.service1()
).subscribe([res1, res2, res3]) {
  this.funcA(res1, res2, res3);
}

0

如上所述,forkJoin是一个不错的解决方案,但它仅发出已完成的调用。如果这些值将被重复发出,请使用combineLatest。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接