RxJS:我如何“手动”更新Observable?

172

我认为我一定是对某个基本概念产生了误解,因为我的理解中这应该是 Observable 的最基本用例,但是我无论如何都无法从文档中找出如何实现。

基本上,我想要做到这一点:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}

但是我没有找到像 push 这样的方法。我正在为一个点击处理程序使用它,我知道他们有 Observable.fromEvent 来处理这个,但我想在 React 中使用它,而不是使用完全不同的事件处理系统。所以基本上我想要这个:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});

我最接近的方法是使用observer.onNext('foo'),但似乎并没有真正起作用,并且这是在观察者上调用的,这似乎不正确。观察者应该是对数据流做出反应的东西,而不是改变它,对吗?

我是不是不理解观察者/可观察者的关系?


看看这个,以澄清你的想法(你一直缺少的响应式编程介绍):https://gist.github.com/staltz/868e7e9bc2a7b8c1f754。在这里,还有许多资源可以帮助你提高理解能力:https://github.com/Reactive-Extensions/RxJS#resources。 - user3743222
我已经查看了第一个,它似乎是一个可靠的资源。第二个是一个很棒的列表,在其中我找到了 http://aaronstacy.com/writings/reactive-programming-and-mvc/ 这个网址,帮助我发现了 Rx.Subject,这解决了我的问题。所以谢谢!一旦我写了更多的应用程序,我会发布我的解决方案,只是想要进行一些测试。 - LiamD
呵呵,非常感谢你提出这个问题,我正想问同样的问题,并且已经有了相同的代码示例在脑海中 :-) - arturh
3个回答

176
在RX中,观察者(Observer)和被观察者(Observable)是不同的实体。观察者订阅一个Observable,当Observable调用观察者方法时,它会向其观察者发出项目。如果您需要在Observable.create()作用域之外调用观察者方法,则可以使用Subject,它是一个代理,同时充当观察者和Observable。
您可以像这样操作:
var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var my_function = function() {
  eventStream.next('foo'); 
}

你可以在这里找到有关主题的更多信息:


1
这实际上就是我最终采取的方法!我一直在努力寻找更好的方式来完成我需要做的事情,但这绝对是一个可行的解决方案。我最初在这篇文章中看到了它:http://aaronstacy.com/writings/reactive-programming-and-mvc/。 - LiamD
2
如果无法使用Subjects而必须使用Observables,应该如何实现? - Ian Steffy
1
@IanSteffy 如果您创建一个新的Observable,并从旧的Observable和新的Subject中创建(通过merge),那么您可以使用此Subject来供给新的Observable,这样会怎么样呢? - PEZO

40

我相信 Observable.create() 方法不接受 observer 作为回调参数,而是一个发射器(emitter)。所以如果您想向您的Observable 添加新值,请尝试以下操作:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
  next: function(next) {
    console.log(next);
  },
  error: function(error) {
    console.log(error);
  },
  complete: function() {
    console.log("done");
  }
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"

使用Subject可以更方便地提供Observable和Observer,但并非完全相同,因为当一个Observable只向最后一个订阅的Observer发送数据时,Subject允许您向同一个Observable订阅多个观察者,因此请谨慎使用。

如果您想要尝试,请点击这里JsBin


RxJS手册中有关于覆盖emitter属性的文档吗? - Tomas
在这种情况下,emitter 只会为订阅了最后一个观察者的新值调用 next()。更好的方法是将所有的 emitter 收集到一个数组中,并遍历它们并在每个 emitter 上调用 next 方法来传递值。 - Eric Gopak
用什么替换已弃用的 Observable.create() 调用呢?我尝试了 new Observable(emitter),但它的行为不如我所预期的那样。https://stackoverflow.com/q/65060800/958373 - Stephane

-1

变量观察者 = Observable.subscribe(

function(x){

 console.log('next: ' + 

var my_function = function(){

Observable.push('hello')

更新可观察对象的一种方式。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接