取消订阅RxJS Observables

11
我有这两个对象,我想停止监听它们的事件。我完全不了解observables和RxJS,只是试图使用Inquirer库。
以下是RxJS API供参考:http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html 我该如何取消订阅这些类型的observables? ConnectableObservable:
   ConnectableObservable {
     source: EventPatternObservable { _add: [Function], _del: [Function], _fn: undefined },
     _connection: ConnectDisposable { _p: [Circular], _s: [Object] },
     _source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] },
     _subject: 
      Subject {
        isDisposed: false,
        isStopped: false,
        observers: [Object],
        hasError: false } },
  _count: 1,
  _connectableSubscription: 
   ConnectDisposable {
     _p: 
      ConnectableObservable {
        source: [Object],
        _connection: [Circular],
        _source: [Object],
        _subject: [Object] },
     _s: AutoDetachObserver { isStopped: false, observer: [Object], m: [Object] } } }

FilterObservable:

FilterObservable {
  source: 
   RefCountObservable {
     source: 
      ConnectableObservable {
        source: [Object],
        _connection: [Object],
        _source: [Object],
        _subject: [Object] },
     _count: 1,
     _connectableSubscription: ConnectDisposable { _p: [Object], _s: [Object] } },
  predicate: [Function] }

我需要取消订阅这些对象:

'use strict';
var rx = require('rx');

function normalizeKeypressEvents(value, key) {
  return {value: value, key: key || {}};
}

module.exports = function (rl) {

  var keypress = rx.Observable.fromEvent(rl.input, 'keypress', normalizeKeypressEvents)
    .filter(function (e) {
      // Ignore `enter` key. On the readline, we only care about the `line` event.
      return e.key.name !== 'enter' && e.key.name !== 'return';
    });

  return {
    line: rx.Observable.fromEvent(rl, 'line'),

    keypress: keypress,

    normalizedLeftKey: keypress.filter(function (e) {
      return e.key.name === 'left';
    }).share(),

    normalizedRightKey: keypress.filter(function (e) {
      return e.key.name === 'right';
    }).share(),

    normalizedUpKey: keypress.filter(function (e) {
      return e.key.name === 'up' || e.key.name === 'k' || (e.key.name === 'p' && e.key.ctrl);
    }).share(),

    normalizedDownKey: keypress.filter(function (e) {
      return e.key.name === 'down' || e.key.name === 'j' || (e.key.name === 'n' && e.key.ctrl);
    }).share(),

    numberKey: keypress.filter(function (e) {
      return e.value && '123456789'.indexOf(e.value) >= 0;
    }).map(function (e) {
      return Number(e.value);
    }).share(),

    spaceKey: keypress.filter(function (e) {
      return e.key && e.key.name === 'space';
    }).share(),

    aKey: keypress.filter(function (e) {
      return e.key && e.key.name === 'a';
    }).share(),

    iKey: keypress.filter(function (e) {
      return e.key && e.key.name === 'i';
    }).share()
  };
};

我目前的最佳猜测是没有像这样显式调用subscribe:

var source = Rx.Observable.fromEvent(input, 'click');

var subscription = source.subscribe(
  function (x) {
    console.log('Next: Clicked!');
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

但实际上,这里有这些调用:
events.normalizedUpKey.takeUntil(validation.success).forEach(this.onUpKey.bind(this));
events.normalizedDownKey.takeUntil(validation.success).forEach(this.onDownKey.bind(this));

所以我最好的猜测是,我需要一种方法来使takeUntil调用无效/取消。
3个回答

24

能否告诉我上述代码中,该调用是否在内部发生?很显然,我所处理的代码有效地订阅了可观察事件,但是我在代码中没有看到任何明确的 subscribe() 调用… - Alexander Mills
1
看起来 rx.Observable.fromEvent() 在这里发挥作用,但我不知道如何停止它! - Alexander Mills
这有一点帮助 => https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/fromevent.md - Alexander Mills
我认为要得到一个被接受的答案,你必须帮助我撤销 takeUntil 调用 :) - Alexander Mills

3
我赞同第一位评论者的观点。 不过,感觉代码中需要有这样一个调用:
let subscription = normalizedUpKey.subscribe( data => console.log('data') );

你可以做的是:
subscription.unsubscribe()

如果没有on事件,你怎么知道什么时候发生了什么,或者这是第三方库的一部分吗?

想要了解更多关于Rxjs的内容,我建议您查看这本免费书籍:https://www.gitbook.com/book/chrisnoring/rxjs-5-ultimate/details


1

不需要也没有协议可以取消订阅一个可观测对象。实际上,我看到了您的问题中的代码,特别是那个返回对象的部分,其中包含了由 share 组合的一堆可观测对象。然而,这些可观测对象仍然是可观测对象,而不是订阅,这意味着对于这些元素,不存在“取消订阅”的概念。

因此,如果您在模块之外有一些类似订阅的新代码,并且正好与事件可观测对象一起工作,您显然可以取消订阅特定的订阅实例。

目前,对于问题中的代码,源可观测对象使用的方法都是操作符,比如 .filter().share().takeUntil(),而不是订阅执行,它们实际上是返回新可观测对象的方法。

正如 Rxjs 官方文档所描述的那样,尽管 .share() 创建了 多播可观测对象,但在使用某些便捷的操作符时,如果订阅者数量从 1 减少到 0,执行仍然可能会停止,您的代码中的 .share() 正好也被包括在内。

总之,在您提出的问题中,不需要担心事件取消订阅您的代码。可能存在另一个问题,听起来像是您问题中描述的关注点:如果您使用.connect()而不是.share()。这是关于ConnectableObservable的情况,您需要考虑手动取消事件绑定。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接