RxJs中的Observable数组转换为数组的Observable

30

我需要处理使用Angular2和TypeScript编写的Web应用程序中的RxJs Observable。由于我以前从未使用过rxjs并且对反应式编程也不熟悉,因此有时会难以找到完成某些特定任务的正确方法。
目前,我面临一个问题,需要将Array<Observable<T>>转换为Observable<Array<T>>
以下是一个示例来解释:

  • 我有一个Observable,它给我提供了一个User列表(Observable<Array<User>>
  • User类有一个函数getPosts返回Observable<Array<Post>>
  • 我需要将Observable<Array<User>>映射到Observable<Array<Post>>,以便在onNext函数中评估所有Posts。

我可以轻松地使用map((result : Array<User>) => result.map((user : User) => user.getPosts()))Observable<Array<User>>映射到Observable<Array<Observable<Array<Post>>>>,并将Array<Array<Post>>扁平化为Array<Post>
但是,我无法找到正确的方法将Observable<Array<Observable<Array<Post>>>>映射到Observable<Array<Array<Post>>>。目前,我使用combineLatest函数和flatMap函数一起使用。在我使用的编辑器(Atom Editor)中,它似乎可以工作,并且没有显示任何错误。但是现在我使用Netbeans,在这段代码中它向我显示了一个错误。同时,使用“tsc”编译代码将导致以下错误:

Argument of type '(result: Post[]) => void' is not assignable to parameter of type 'NextObserver<[Observable<Post>]> | ErrorObserver<[Observable<Post>]> | CompletionObserver<[...'.
Type '(result: Post[]) => void' is not assignable to type '(value: [Observable<Post>]) => void'.  

所以我的问题是:
如何将一个ObservablesArray“展平”成一个Array


getPosts 方法对应什么?它会加载特定用户的帖子吗? - Thierry Templier
5个回答

41
flatMap 操作符可以实现这一点。我不完全理解你试图做什么,但我会尝试提供一个答案...

如果您想加载所有的

getPostsPerUser() {
  return this.http.get('/users')
    .map(res => res.json())
    .flatMap((result : Array<User>) => {
      return Observable.forkJoin(
        result.map((user : User) => user.getPosts());
    });
}

Observable.forkJoin 允许您等待所有 observables 都收到数据。

上面的代码假设 user.getPosts() 返回一个 observable...

使用它,您将收到一个帖子数组的数组:

this.getPostsPerUser().subscribe(result => {
  var postsUser1 = result[0];
  var postsUser2 = result[1];
  (...)
});

1
是的,getPosts() 返回一个 Observable<Array<Post>>。所以我想这就是我要找的。我会尽快试一下并告诉你是否有效。谢谢。 - Robert P
1
完美运作。我只需要用 forkJoin 替换 combineLatest。非常感谢! - Robert P

10

你可以像这样,在想要的rxjs方法上使用apply函数:

const source1 = Rx.Observable.interval(100)
  .map(function (i) { return 'First: ' + i; });

const source2 = Rx.Observable.interval(150)
  .map(function (i) { return 'Second: ' + i; });

const observablesArray = [source1, source2];

const sources = Rx.Observable.combineLatest
.apply(this, observablesArray).take(4)

/* or you can write it without "apply" this way:
const sources = Rx.Observable
                .combineLatest(...observablesArray)
                .take(4)
*/
sources.subscribe(
  (response) => {
    console.log(response);
  }
)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>


我猜这个也可以工作,但是我现在还是坚持使用forkJoin - Robert P

5

以下是使用rxjs@6.3.3的示例代码

import { of, forkJoin} from 'rxjs';
import { Observable } from 'rxjs'

const source:Array<Observable<String>> = [of('A'), of('B'), of('C') ];

forkJoin(source).subscribe( (x)=> console.log(x))

https://arrayofobservablesexamp.stackblitz.io


4

使用forkJoin操作符对这个Observable数组进行处理,然后它将成为一个数组的Observable。


0

使用 toArray()

range(0, 7).pipe(
  concatMap(v => of(`day_${v}`)),
  toArray() // here
).subscribe(a => {
  console.log(a); // ['day_0', 'day_1', ... 'day_6']
});

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接