检索可观察对象的订阅者并使它们订阅另一个可观察对象。

4

简单来说

给定一个尚未完成的现有Observable,是否有一种方法可以检索相关的订阅者(传递给subscribe的函数)以使它们订阅另一个Observable?

背景

我的应用程序中的服务帮助创建SeverEvent连接,返回一个ConnectableObservable以代理连接并使用publish运算符允许多播。该服务通过内部存储跟踪现有的连接:

store: {[key: string]: ConnectionTracker};

// …

interface ConnectionTracker {
    url: string;
    eventSource: EventSource;
    observable: rx.ConnectableObservable<any>;
    subscription: rx.Subscription;
    observer: rx.Observer<any>;
    data?: any; // Arbitrary data
}

在连接创建时,如果已经存在相关联的跟踪器(使用连接端点进行身份验证),则该服务应该:

  • 关闭现有跟踪器的 ServerEvent 连接。
  • 打开新的 ServerEvent 连接(因此是一个新的 ConnectableObservable)。
  • 用新的 Observable 替换现有的跟踪器 Observable,但现有的订阅者现在应该订阅新的 Observable 而不是旧的

下面是创建 ConnectionTracker 的代码部分:

/**
* Create/Update a ServerEvent connection tracker
*/
createTracker<T>(endpoint: string, queryString: string = null): ConnectionTracker
{
    let fullUri = endpoint + (queryString ? `?${queryString}` : '')
        , tracker = this.findTrackerByEndpoint(endpoint) || {
            observable: null,
            fullUri: fullUri,
            eventSource: null,
            observer: null,
            subscription: null
        }
    ;

    // Tracker exists
    if (tracker.observable !== null) {
        // If fullUri hasn't changed, use the tracker as is
        if (tracker.fullUri === fullUri) {
            return tracker;
        }

        // At this point, we know "fullUri" has changed, the tracker's
        // connection should be replaced with a fresh one

// ⇒ TODO
// ⇒ Gather old tracker.observable's subscribers/subscriptions to make
//   them subscribe to the new Observable instead (created down below)

        // Terminate previous connection and clean related resouces
        tracker.observer.complete();
        tracker.eventSource.close();
    }

    tracker.eventSource = new EventSource(<any>fullUri, {withCredentials: true});
    tracker.observable = rx.Observable.create((observer: rx.Observer<T>) => {
            // Executed once
            tracker.eventSource.onmessage = e => observer.next(JSON.parse(e.data));
            tracker.eventSource.onerror = e => observer.error(e);
            // Keep track of the observer
            tracker.observer = observer;
        })
        // Transform Observable into a ConnectableObservable for multicast
        .publish()
    ;

    // Start emitting right away and also keep a reference to 
    // proxy subscription for later disposal
    tracker.subscription = tracker.observable.connect();

    return tracker;
}

谢谢你。

看起来你可以只使用 switchMap 来返回“新”的 Observable。订阅者将保持不变,但从这个“新”的 Observable 接收值。 - martin
我担心_switchMap_不是我要找的。我的意图是创建一个可插拔的替代Observable,但从先前的Observable中恢复注册的观察者。一旦ServerEvent连接关闭,相关的Observable就变得过时了(没有源→不再调用_next()_),对Observable的引用可以被丢弃。_switchMap_只构建一个Observable链:原始的Observable理论上仍然在工作(据我所知),但因为相关的连接已关闭,不会再发出任何值,新的Observable也没有机会接管。 - Stphane
2个回答

2

不要手动尝试将订阅者从一个 Observable 转移到另一个 Observable,而是应该提供一个 Observable,当需要时会自动切换到另一个 Observable。

为此,您可以使用高阶 Observable(发出 Observables 的 Observable),它始终切换到最新的内部 Observable。

基本概念

// a BehaviorSubject is used so that late subscribers also immediately get the most recent inner Observable
const higherOrderObservable = new BehaviorSubject<Observable<any>>(EMPTY);

// pass new Observable to listeners
higherOrderObservable.next(new Observable(..));

// get most recent inner Observable
const currentObservable = higherOrderObservable.pipe(switchMap(obs => obs));
currentObservable.subscribe(valueFromInnerObservable => { .. })

针对您的情况

对于每个端点,创建一个BehaviorSubject (跟踪器提供者),它会发出应该当前用于该端点的Observable (跟踪器)。当应该使用给定端点的不同跟踪器时,将这个新的Observable传递给BehaviorSubject。让您的监听器订阅BehaviorSubject (跟踪器提供者),它会自动为他们提供正确的跟踪器,即切换到应该当前使用的Observable。

您的代码的简化版本可能如下所示。具体细节取决于您在整个应用程序中如何使用函数createTracker

interface ConnectionTracker {
  fullUri: string;
  tracker$: ConnectableObservable<any>;
}

// Map an endpoint to a tracker supplier.
// This is your higher order Observable as it emits objects that wrap an Observable
store: { [key: string]: BehaviorSubject<ConnectionTracker> };
closeAllTrackers$ = new Subject();

// Creates a new tracker if necessary and returns a ConnectedObservable for that tracker. 
// The ConnectedObservable will always resemble the current tracker.
createTracker<T>(endpoint: string, queryString: string = null): Observable<any> {
  const fullUri = endpoint + (queryString ? `?${queryString}` : '');
  // if no tracker supplier for the endpoint exists, create one
  if (!store[endpoint]) {
    store[endpoint] = new BehaviorSubject<ConnectionTracker>(null);
  }
  const currentTracker = store[endpoint].getValue();

  // if no tracker exists or the current one is obsolete, create a new one
  if (!currentTracker || currentTracker.fullUri !== fullUri) {
    const tracker$ = new Observable<T>(subscriber => {
      const source = new EventSource(fullUri, { withCredentials: true });
      source.onmessage = e => subscriber.next(JSON.parse(e.data));
      source.onerror = e => subscriber.error(e);
      return () => source.close(); // on unsubscribe close the source
    }).pipe(publish()) as ConnectableObservable<any>;
    tracker$.connect();
    // pass the new tracker to the tracker supplier
    store[endpoint].next({ fullUri, tracker$ });
  }
  // return the tracker supplier for the given endpoint that always switches to the current tracker
  return store[endpoint].pipe(
    switchMap(tracker => tracker ? tracker.tracker$ : EMPTY), // switchMap will unsubscribe from the previous tracker and thus close the connection if a new tracker comes in
    takeUntil(this.closeAllTrackers$) // complete the tracker supplier on emit
  );
}

// close all trackers and remove the tracker suppliers
closeAllTrackers() {
  this.closeAllTrackers$.next();
  this.store = {};
}

如果您想一次性关闭所有跟踪器连接,并且现有的订阅者应该收到“complete”通知,请调用closeAllTrackers。 如果您只想关闭一些跟踪器连接,但不希望现有的订阅者接收到“complete”通知,以便他们继续监听未来提供的新跟踪器,请为每个跟踪器调用store[trackerEndpoint].next(null)

如果我需要一次性断开所有现有连接怎么办?循环_store_自身属性并在每个BehaviorSubject上调用_.complete()_是否足够? - Stphane
你可能不想完成BehaviorSubjects本身,因为如果你想要发射任何新的跟踪器,那么之后就无法再次发射。相反,只需向每个跟踪器供应商传递“null”:store[endpoint].next(null)。我编辑了最后一个返回语句,以便如果将“null”传递给BehaviorSubject,则会切换到仅完成的“EMPTY”可观察对象。这样,您之前的跟踪器将被取消订阅,并且监听器在传递新的Observable之前将不会收到任何数据。 - frido

1
如果您尝试像将订阅者移动到另一个可观察对象中一样做事情,那么您实际上并没有按照 RxJS 的意图进行操作。任何此类操纵基本上都是黑客行为。
如果您偶尔生成新的可观察对象(例如通过发出请求),并且希望某些订阅者始终订阅其中最新的对象,则以下是解决方案:
  private observables: Subject<Observable<Data>> = new Subject();

  getData(): Observable<Data> {
    return this.observables.pipe(switchAll());
  }

  onMakingNewRequest(newObservable: Observable<Data>) {
    this.observables.push(newObservable);
  }

通过将数据推送到this.observables,您可以公开单个可观察对象(通过getData()),客户端订阅该对象,但是您同时改变了用户所看到的实际数据源。

至于关闭连接和类似的事情,您的可观察对象(每个请求或其他内容创建的对象)应该在取消订阅时负责释放和关闭相关资源,这样您就不需要进行任何额外的处理,以前的可观察对象将在推送新对象的时候自动取消订阅。具体细节取决于您联系的后端服务。


你的第二句话听起来就像我正在寻找的,我感觉_Subject_ / _switchAll_组合正是我所需要的。我会深入研究这个,谢谢。 - Stphane

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接