RxJS合并观察者在回调中

6
我需要帮助把一个函数改为返回Observable的形式。我的函数(我们称之为mainFunction)调用了一个异步函数(我们称之为getAsyncWithCallback),并执行回调函数。回调函数中一部分是一个返回Observable的异步调用(我们称之为getData)。 getAsyncWithCallback是库的一部分,我无法更改它。
这是我的代码:
mainFunction(){
    getAsyncWithCallback(() => {
        let myObservable = getData();
    });
}

这是我想要做的事情:
mainFunction().subscribe((data) => { 
    // data is the data from getData()
});

什么是实现这一目标的最佳方式,并且如何将内部的错误和完成形式连接起来?

如果他们不返回myObservable(或将其分配到其他地方),我认为你无法做太多事情... - acdcjunior
我相信你可以使用 Promise 并将其转换为 Observable。我目前没有 IDE 来为您提供示例。 - MistyK
回调函数是我可以更改的代码。我在其中定义了 myObservable - Grey
2个回答

2

我的解决方案:

mainFunction(): Observable<any> {

    return Rx.Observable.create(observer => {
        getAsyncWithCallback((): void => {
            getData().subscribe(observer);
        })
    })
}

mainFunction().subscribe((data) => {
    console.log(data);
})); 

这将创建一个可观察的可观察对象,因为getData返回一个Observable。我认为你需要在其中某个地方抛出一个mergeMap/flatMap - Pace
1
我认为最好写成getData().subscribe(observer),而不是像@Pace建议的那样使用操作符。 - Oles Savluk
1
现在你所拥有的不会将源可观察对象到观察者的错误或完成链接起来。此外,我认为你的意思是observer.next而不是Observer.next。@Oles Savluk也提出了一个很好的观点(将链接所有事件),但我仍然不喜欢它,因为它强制订阅内部可观察对象,即使没有人订阅外部可观察对象。 - Pace
我明白了。你说得对,关于错误和完整性,我没有想到在调用函数之前会强制订阅。函数在定义后直接解析吗?我不确定我理解Oles的观点。subscribe(observer)会做什么? - Rex
1
@RavenDev 看一下,.subscribe 接受一个实现了 Observer 接口的对象,而你的 observer 变量正好实现了这个接口。所以基本上它会为你调用 observer.nextobserver.errorobserver.complete(而不是像你手动写的那样)。 - Oles Savluk
显示剩余4条评论

0

你需要从mainFunction返回一个Observable。

如果getData返回的是其他类型的对象而不是Observable,你可以使用Observable.create构建自己的Observable:

function mainFunction() {
  return Observable.create((obs) => {
    getAsyncWithCallback(() => {
      getData()
        .subscribe((data) => { obs.next(data); })
        .catch((err) => { obs.error(err) });
    });
  });
}

mainFunction().subscribe(data => console.log(data));

来自文档:

create 将 onSubscription 函数转换为实际的 Observable。每当有人订阅该 Observable 时,该函数将被调用,并将 Observer 实例作为第一个且唯一的参数传递。

使用 next 调用值将向观察者发出该值。

大多数情况下,您不应该需要使用 create,因为现有的操作符允许您为大多数用例创建 Observable。话虽如此,如果您有非常特定的需求,create 是一种低级机制,允许您创建任何 Observable。

还有 Observable.from,它可以从数组、类似数组的对象、迭代器、类似 Observable 的对象和 Promises 创建 Observables。


第一个选项不能解决这个问题,因为我需要调用getAsyncWithCallback()。至于第二个选项,它不能链接错误和完成。我会在我的问题中添加这个信息。 - Grey
1
@Grey 不确定你的意思,因为在你的示例中,你没有处理回调失败的情况。不过这应该是一个很好的开始,你可以使用obs.error()或obs.complete()来表示observable的错误或完成。 - user9903
@RudolfOlah 你不能在 .subscribe 之后执行 .catch,也没有发送完成信号。最好按照我在另一个答案的评论中描述的那样编写 .subscribe(obs) - Oles Savluk
@OlesSavluk 谢谢,目前没有控制台来测试这个。我想知道是否可以使用stackoverflow可运行的代码功能...另外也许我们应该创建一个社区维基答案,这样你的评论就可以作为答案的一部分显示出来。 - user9903

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接