Observable.forkJoin和数组参数

48
在Observables的forkJoin文档中,它说args可以是一个数组,但没有列出一个示例来展示如何使用:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md

我尝试了类似以下代码的函数,但遇到了错误:

:3000/angular2/src/platform/browser/browser_adapter.js:76 

EXCEPTION: TypeError: Observable_1.Observable.forkJoin is not a function

以下是我函数的剪切版本:

processStuff( inputObject ) {
  let _self = this;

  return new Observable(function(observer) {
    let observableBatch = [];

    inputObject.forEach(function(componentarray, key) {
      observableBatch.push(_self.http.get(key + '.json').map((res: Response) => res.json()));
    });

    Observable.forkJoin(
      observableBatch
    // );
    ).subscribe(() => {
      observer.next();
      observer.complete();
    });

  });
}
我的问题根源涉及到一个循环,在这里要求循环结束后再继续进行,详情请参见:Angular2 Observable - how to wait for all function calls in a loop to end before proceeding?。但是,我还没有完全掌握如何正确使用带有数组参数的forkJoin以及正确的语法。非常感谢您能提供的帮助。

注:第三个返回Observable的函数示例

thirdFunction() {
  let _self = this;

  return Observable.create((observer) => {
  // return new Observable(function(observer) {
    ...

    observer.next(responseargs);
    observer.complete();
  });
}

processStuff(inputObject) {
  let _self = this;
  let observableBatch = [];

  inputObject.forEach((componentarray, key) => {
    observableBatch.push(_self.thirdFunction().map((res: Response) => res.json()));
  });

  return Observable.forkJoin(observableBatch);
}

elsewhere() {
  this.processStuff(inputObject)
    .subscribe()
}

1
Observable.forkJoin([http.get(), http.get(), http.get(), etc]).subscribe((res) => ...请参考:https://dev59.com/-1sW5IYBdhLWcg3wOk_O#35247372 - Eric Martinez
3
你是否忘记了导入import 'rxjs/add/observable/forkJoin'或者import 'rxjs/Rx' - Sasxa
我只是有这个import {Observable} from "rxjs/Observable"; - Benjamin McFerren
2
请参考这里,其他运算符/生成器同样适用。 - Sasxa
2个回答

93

您需要导入不会默认加载的运算符。 这通常是指EXCEPTION Observable.xxxx is not a function。 您可以通过将rxjs完整引入到启动文件中来导入所有运算符,例如:

import 'rxjs/Rx'

或者通过导入特定的运算符,就像您的情况一样:

import 'rxjs/add/observable/forkJoin'

关于你的代码,我有一个建议:尽量坚持一种语法。你混合使用es5、es6和typescript...虽然能工作,但长远来看只会让你更加困惑。此外,如果你刚开始接触Observables,请避免使用new Observable(),而是使用创建操作符。

processStuff( inputObject ) {
  let observableBatch = [];

  inputObject.forEach(( componentarray, key ) => {
    observableBatch.push( this.http.get( key + '.json').map((res: Response) => res.json()) );
  });

  return Observable.forkJoin(observableBatch);
}

elsewhere() {
  this.processStuff( inputObject )
    .subscribe()
}

最后,参考正确的文档 - Angular2使用的是RxJS v5,而您提供的链接是RxJS v4的。目前v5的文档仍不完整,但您可以在许多源文件中找到描述。


嗨@Sasxa,我遇到了麻烦,因为我正在使用第三个函数而不是this.http.get(...)。你能否提供一个例子,其中第三个函数返回new Observable(function(observer) {的示例?我已经编辑了我的原始帖子以显示一个示例。非常感谢您的帮助。 - Benjamin McFerren
搞定了 - 第三个函数需要这样写:return Observable.create((observer) => { ... observer.next("test"); observer.complete(); - Benjamin McFerren
我喜欢这个解决方案。唯一让我无法理解的是如何处理一些请求的错误,当大多数请求都成功时? - Darryl Wagoner WA1GON
2
在最新版本中应该是 "return forkJoin(observableBatch)" 而不是 "return Observable.forkJoin(observableBatch)"。 - Volodymyr
@Volodymyr的答案是与Angular 7+兼容的,导入方式为import {forkJoin} from 'rxjs'; - Biiz

2
这里是使用Angular 12的工作演示。语法与被接受的答案相比有很大的不同(没有Observable)。请参考以下链接:https://stackblitz.com/edit/angular-map-to-forkjoin?file=src/app/app.component.ts。注意:打开开发者工具控制台可查看输出内容。首先,它将一个JSON项数组(在本例中为一个用户数组)转换为使用map方法的单个HTTP Patch调用。
  private mapCalls(users: any[]): Observable<any>[] {
    return users.map(user => {
      // Each User will be patched to the endpoint
      // None of these calls will be made until forkJoin is used
      return this.http.patch(
        `https://jsonplaceholder.typicode.com/users/${user.id}`,
        user
      );
    });
  }

现在这是一个Observable数组。为了使这些调用被执行,forkJoin作为一个包装器被添加。

return forkJoin(mappedCalls);

最终结果可以被订阅,输出为一个具有相同项目数的单一数组。

注意:如果forkJoin中有一个调用失败,所有调用都将失败。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接