forkJoin不能与AngularFire2的valueChanges一起使用。

8
请帮我解决一个问题,我一直在苦苦挣扎。
我有一个 Firebase 对象键的数组。
const keys = ['-Kx9pqoMWlJLbKLQcAkP', '-Kx9pqoOYlDHTJ64Was5']

我想做的是使用forkJoin将所有Firebase对象合并到一个流中。 这是我的代码:

const obj1 = this.fbService.getObj(keys[0]);
const obj2 = this.fbService.getObj(keys[1]);

forkJoin([obj1, obj2])
    .subscribe(res => {
        console.log(res);  // <-- this never happens
    };

fbService方法是:

getObj(key): Observable<MyObj> {
  return this.fb.object(`/path/to/obj/${key}`).valueChanges();
}

我假设这个getObj方法和forkJoin一起使用时不能正常工作,可能是因为valueChanges,请问我的使用方式正确吗?
但是:
  • getObj works fine for getting single Firebase object, like:

    this.fbService.getObj(keys[0])
        .subsribe(res => console.log(res))// <-- works
    
  • forkJoin works fine with simple HTTP requests, like

    const r1 = this.http.get('https://swapi.co/api/people/1');
    forkJoin([r1])
        .subscribe(res => {
            console.log(res);  // <-- works
        };
    

那么,我做错了什么呢? 我的目标是从键的数组中获取对象数组:

['-Kx9pqoMWlJLbKLQcAkP', '-Kx9pqoOYlDHTJ64Was5'] => [{prop:'val'},{prop:'val2'}]

你试过这个 this.fbService.getObj(keys[1]) 是否也可以工作吗?因为如果 forkJoin 中的多个可观察对象中有任何一个失败,整个流都会失败。 - CozyAzure
是的,我尝试过了。this.fbService.getObj(keys[1]) 是可以工作的,但当它在 forkJoin 中时,即使单独使用,它也不起作用。 - Ivan
3个回答

10
< p > forkJoin 操作符要求所有源 Observable 至少发出一个项并完成。 < /p>< p > 我对 Firebase 不是很了解,但我怀疑 valueChanges 从不完成,这就是为什么 forkJoin 从未发出任何内容的原因。 解决此问题的一种方法是使用 take(1) 始终完成链。< /p>
forkJoin(obj1.take(1), obj2.take(1)).subscribe(res => console.log(res);

或许在你的情况下,最好使用zip()运算符,因为它只需要所有源Observable发出相同数量的项。但是请确保取消订阅它,因为它不会在其源Observables完成之前自行完成。


2
我刚刚查看了文档。是的,它们从来没有完成。我猜 OP 可以选择使用 .combineLatest() - CozyAzure
这是一个很好的观点,但是为什么当我单独使用valueChanges时它会起作用呢?this.fb.object(...).valueChanges().subscribe(WORKS) - Ivan
2
因为第一个参数是每个项目调用的“next”处理程序。如果您想检查Observable是否完成,可以使用例如:.subscribe(undefined, undefined, () => console.log('complete')) - martin
.combineLatest() 在这种情况下是可以的,但请注意它会在任何源 Observable 发出时都进行发射,因此您可能会重复获取相同的值。 - martin
@Matiishyn 这意味着流永远不会完成。当您订阅可观察对象时,您只是“反应”流。如果您想知道流是否已完成,可以将代码放入complete()处理程序中。但您会意识到它们永远不会被执行。 - CozyAzure
嘿,大家好,我正在努力让combinelatest与这个确切的问题配合工作。你们能告诉我你们是如何解决它的吗? - Ross Rawlins

1

@martin已提供正确答案,但是代码示例需要更新为新的RxJS语法:

forkJoin(obj1.pipe(take(1)), obj2.pipe(take(1))).subscribe(res => console.log(res));

0
尝试使用 startWith 运算符。
getObj(key): Observable<MyObj> {
  return this.fb.object(`/path/to/obj/${key}`)
     .valueChanges.pipe(starWith(value)); // value can be default obj to trigger stream
}

可能是因为 valueChanges 只有在手动触发流之后才开始发出,所以 forkJoin 只有在所有源 Observable 至少发出一次后才会触发;因此,我们立即让它们都发出默认值一次。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接