如何使用 RxJS 6.x 和 Node.js 合并一个 Observable 数组?

4
为了学习目的,我正在创建一个Node应用程序,需要从一个数组中获取x个RxJS Observables并将其合并成单个事件流。我想知道任何Observable中发生的事件(不按顺序或完全完成)。我觉得它应该在一个单一的合并事件流中。基本上,任何Observable中传递的第一个事件都将完成。
为此,我觉得merge()可以解决问题。由于合并不能直接将数组作为参数,因此到目前为止,我正在使用reduce来帮助合并。
然而,最终结果不是observable,而是一个函数。我也无法订阅它。下面是代码的简化版本。
如何更改此Node 10.14.2,RxJS 6.4.x代码以返回observable而不是我可以附加“.subscribe()”的“[function]”?
const { Observable } = require('rxjs');
const { merge } = require('rxjs/operators');

const observables = [
    Observable.create(observer => observer.next('Hello')),
    Observable.create(observer => observer.next('Hello')),
    Observable.create(observer => observer.next('Hello'))
];

const mergedObservables = observables.reduce((merged, observable) => {
    console.log(observable);
    return merge(merged, observable);
});

// outputs:
// Observable { _isScalar: false, _subscribe: [Function] }
// Observable { _isScalar: false, _subscribe: [Function] }

console.log(mergedObservables);

// outputs:
// [Function]

mergedObservables.subscribe();
// error:
// TypeError: mergedObservables.subscribe is not a function
1个回答

8

编辑:你导入的是merge操作符,而不是静态的merge函数。前者在源可观察对象上操作事件,而后者从一个或多个源可观察对象创建新的可观察对象。尽管下面关于展开语法的想法简化了您的代码,但这不是真正的问题。


看起来Node.js >= 5.0支持在函数调用中对数组使用展开运算符(不过您没有指定正在使用哪个版本的Node.js)。如果您正在使用现代版本的Node.js,则以下内容应该可以工作:

const { Observable, merge } = require('rxjs')

const observables = [
    Observable.create(observer => observer.next('Hello')),
    Observable.create(observer => observer.next('Hello')),
    Observable.create(observer => observer.next('Hello'))
]

const mergedObservables = merge(...observables)
mergedObservables.subscribe(event => { console.log(event) })

1
这让我更接近了。我确实喜欢这里使用展开运算符的方式。然而,结果与您提供的示例相同:“TypeError: mergedObservables.subscribe不是一个函数”。我正在使用Node版本10.14.2,并将在问题中进行更正。 - Christopher Stevens
唉呀!那是因为我使用了 运算符 而不是 _函数_(前者作用于可观察对象的事件,而后者创建一个新的可观察对象)。我会更新答案。 - seniorquico
谢谢你的回答。这对我帮助很大。我没意识到有一个合并运算符和函数。 - Christopher Stevens

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接