Rxjs可观察订阅数量

31

在我的Angular 2应用程序中,我有许多观察者和订阅。当然,在离开页面时应该取消订阅,但我想知道是否有可能获得活动订阅的数量,仅供调试信息或在我忘记取消订阅时使用。

在rxjs中是否有这样的信息可用?


我不知道(因此评论而不是回答)...但我知道,如果你在视图中使用async管道而不是订阅可观察对象,你就不需要取消订阅,因为框架会为你处理。 - Brocco
7个回答

21

可能有点晚了,但您可以利用rxjs-spy的帮助。

这个解决方案与所提出的方案等效,并且在我看来更易于维护。

我通常将其全局启用在main.ts中作为一种“启动并忘记”的策略。您只需执行以下操作:

  1. install rxjs-spy via your package manager

  2. import in main.ts the reference to the create function: import { create } from 'rxjs-spy';

  3. initialize the rxjs-spy in debug builds right after the angular initialization snippet:

    
    if (environment.production) {
        enableProdMode();
    }
    else {
        //we enable RXjs Spy on non production bulds only
        const spy = create();
        // we call show for two purposes: first is to log to the console an empty snapshot so we can see that everything is working as expected, then to suppress unused variable usage (the latter is a convention on mine)
        spy.show();
    }
    
    
  4. give your observables a name:

    
    import { tag } from 'rxjs-spy/operators';
    
    ...
    
    // This is a sample method which asks for a "Product" entity. Product and this.http is omitted as the focus is on tagging the observable
    public getProductById(productId: number): Observable<Product> {
        let params = new HttpParams()
            .append('productId', productId.toString())
            ;
        // we tag the returned observable with the name 'getProductById' (this is a convention on mine, you can choose whatsoever name)
        return this.http.get<Product>(this.baseUrl + "api/product", { params: params }).pipe(tag("getProductById"));
    }
    
    
  5. when you need to look at rxjs state, you can simply open the console window and use rxSpy.show() to have the current snapshot

你可以使用其他命令。一个非常详细的教程是rxjs Spy调试(截至2020年链接已失效)。另一个很好的教程是RxJS工具链。 (如果有人阅读此答案并成功解决格式问题,我会很高兴的)

https://blog.angularindepth.com/debugging-rxjs-4f0340286dd3 这篇文章不再可用了吗? - Budda
显然,自去年以来已经消失了。我会用另一个链接更新答案。谢谢。 - Yennefer

16

下面的实用函数可能有助于...

function subscriberCount<T>(sourceObservable: Observable<T>, description: string) {
  let counter = 0;
  return new Observable((subscriber: Subscriber<T>) => {
    const subscription = sourceObservable.subscribe(subscriber);
    counter++;
    console.log(`${description} subscriptions: ${counter}`);

    return () => {
      subscription.unsubscribe();
      counter--;
      console.log(`${description} subscriptions: ${counter}`);
    }
  });
}

像这样使用:

const timer$ = subscriberCount(Observable.timer(1000), 'Timer');

当订阅者数量发生变化时,控制台将记录日志


1
这应该是最佳答案。非常有用和优雅。 - Ray Suelzer
1
Observable.create已被弃用,您是否可以更新您的示例? - MeIr

4

实际上有一种更简单的方法:

你可以将你的可观察对象转换为 "BehaviorSubject",它们有一个名为 "observers" 的属性,可以给出订阅数量。

之前:

  let myObservable = somethingObservable;
  // Do something complicated to get the subscription count

之后:

let myObservable = new BehaviorSubject();
let count = myObservable.observers.length

请注意,如果您只需要知道是否至少有一个可观察对象,您也可以这样做:
myBehaviorSubject.observed

你有办法知道这些观察者是在哪里创建的吗?我知道我有一个内存泄漏(观察者无限增长),但我很难确定代码的哪个部分(哪个组件/服务)创建了泄漏的观察者。 - Sébastien Tromp
你有办法知道这些观察者是在哪里创建的吗?我遇到了一个内存泄漏的情况(观察者无限增长),但是我很难确定是代码的哪个部分(哪个组件/服务)创建了导致泄漏的观察者。 - undefined

3
在您的情况下,您可以很好地利用带有refCount计数的RxJS Subjects。您可以将源可观察对象提供给Subject,并让refCount管理其订阅。 如果没有观察者监听它,refCount将取消订阅您的源可观察对象。另一方面,如果观察者计数为0并且观察者订阅了它,则会创建源可观察对象的新实例。
Subject将充当观察者和源可观察对象之间的代理,并管理对其的订阅和取消订阅。本质上,当您的第一个观察者订阅您的Subject时,Subject将依次订阅您的源可观察对象(refCount从0变为1)。 Subjects允许多个观察者监听单播源可观察对象,使其成为多播。当观察者开始取消订阅并且refCount再次降至0时,Subject本身将取消订阅源可观察对象。
以下是更好的代码理解:

const {
  Observable
} = Rx;

let sourceObservable = Observable
  .create((observer) => {
    let count = 0;
    let interval = setInterval(() => {
      observer.next(count++)
    }, 700);

    setTimeout(() => {
      clearInterval(interval);
      observer.complete();
    }, 5500);

    return () => {
      clearInterval(interval);
      console.log('######## Source observable unsubscribed');
    }
  })
  .do((x) => console.log('#### Source emits: ' + x));

let subject = sourceObservable
  .share()
  //.do((x) => console.log('#### Subject emits: ' + x))
  ;

let pageOneObserver;
let pageTwoObserver;
let pageThreeObserver;

setTimeout(() => {
  console.log('pageOneObserver will subscribe');
  pageOneObserver = subject.subscribe({
    next: (x) => {
      console.log('pageOneObserver gets: ' + x);
    },
    complete: () => {
      console.log('pageOneObserver: complete');
    }
  });
}, 1000);

setTimeout(() => {
  console.log('pageTwoObserver will subscribe');
  pageTwoObserver = subject.subscribe({
    next: (x) => {
      console.log('pageTwoObserver gets: ' + x);
    },
    complete: () => {
      console.log('pageTwoObserver: complete');
    }
  });
}, 4000);

setTimeout(() => {
  console.log('pageOneObserver will unsubscribe');
  pageOneObserver.unsubscribe();
}, 7000);

setTimeout(() => {
  console.log('pageTwoObserver will unsubscribe');
  pageTwoObserver.unsubscribe();
}, 10000);

setTimeout(() => {
  console.log('pageThreeObserver will subscribe');
  pageThreeObserver = subject.subscribe({
    next: (x) => {
      console.log('pageThreeObserver gets: ' + x);
    },
    complete: () => {
      console.log('pageThreeObserver: complete');
    }
  });
}, 13000);

setTimeout(() => {
  console.log('pageThreeObserver will unsubscribe');
  pageThreeObserver.unsubscribe();
}, 16000);
<script src="https://unpkg.com/rxjs@5.1.1/bundles/Rx.min.js"></script>

有一些简单的方法可以创建主题。例如:
sourceObservable.share();
// is the same as
sourceObservable.publish().refCount();

sourceObservable.publish().refCount();
// is the same as
sourceObservable.multicast(new Rx.Subject()).refCount();

sourceObservable.publishReplay().refCount();
// is the same as
sourceObservable.multicast(new Rx.ReplaySubject(1)).refCount();

sourceObservable.publishBehavior().refCount();
// is the same as
sourceObservable.multicast(new Rx.BehaviorSubject(0)).refCount();

sourceObservable.publishLast().refCount();
// is the same as
sourceObservable.multicast(new Rx.AsyncSubject()).refCount();
sourceObservable.share();还内置了主题工厂,这意味着当源可观察对象在某一点完成时,我们必须创建一个新的源可观察对象实例,但只需使用您选择的主题的新实例即可完成。如果要使用Rx.Subject()之外的其他主题类型,并希望使您的可观察订阅真正可重用,则必须使用主题工厂(这只是返回要使用的任何主题的新实例的函数),如下所示:

const {
  Observable
} = Rx;

let sourceObservable = Observable
  .create((observer) => {
    let count = 0;
    let interval = setInterval(() => {
      observer.next(count++)
    }, 700);

    setTimeout(() => {
      clearInterval(interval);
      observer.complete();
    }, 5500);

    return () => {
      clearInterval(interval);
      console.log('######## Source observable unsubscribed');
    }
  })
  .do((x) => console.log('#### Source emits: ' + x));

/* You could return whatever subject instance you like here */
let subjectFactory = () => new Rx.ReplaySubject(1);

let subject = sourceObservable
 .multicast(subjectFactory)
 .refCount();
 //.do((x) => console.log('#### Subject emits: ' + x))
 ;

let pageOneObserver;
let pageTwoObserver;
let pageThreeObserver;

setTimeout(() => {
  console.log('pageOneObserver will subscribe');
  pageOneObserver = subject.subscribe({
    next: (x) => {
      console.log('pageOneObserver gets: ' + x);
    },
    complete: () => {
      console.log('pageOneObserver: complete');
    }
  });
}, 1000);

setTimeout(() => {
  console.log('pageTwoObserver will subscribe');
  pageTwoObserver = subject.subscribe({
    next: (x) => {
      console.log('pageTwoObserver gets: ' + x);
    },
    complete: () => {
      console.log('pageTwoObserver: complete');
    }
  });
}, 4000);

setTimeout(() => {
  console.log('pageOneObserver will unsubscribe');
  pageOneObserver.unsubscribe();
}, 7000);

setTimeout(() => {
  console.log('pageTwoObserver will unsubscribe');
  pageTwoObserver.unsubscribe();
}, 10000);

setTimeout(() => {
  console.log('pageThreeObserver will subscribe');
  pageThreeObserver = subject.subscribe({
    next: (x) => {
      console.log('pageThreeObserver gets: ' + x);
    },
    complete: () => {
      console.log('pageThreeObserver: complete');
    }
  });
}, 13000);

setTimeout(() => {
  console.log('pageThreeObserver will unsubscribe');
  pageThreeObserver.unsubscribe();
}, 16000);
<script src="https://unpkg.com/rxjs@5.1.1/bundles/Rx.min.js"></script>

如果还有不清楚的地方,请随时提问。


0

您可以使用Subscription类,它带有addremoveunsubscribe方法,以便您管理订阅。 _subscriptions属性的长度基本上告诉您有多少个活动订阅。这种方法在Angular的onDestroy中非常适用于可观察的取消订阅。

import { interval, Subscription } from "rxjs";

const subs = new Subscription();
const sub = interval(1000).subscribe();

subs.add(sub);
console.log(sub);

0
我发现处理类似事情的最好方法是编写一个操作符。
someValue$.pipe(
  doOnSubscriberCountChange((newSubscriberCount) => {
    console.log(newSubscriberCount);
    if (newSubscriberCount === 1) {
      // Do something when subscriber count changes from 0 to 1
    } else if (newSubscriberCount === 0) {
      // Do something when subscriber count changes from 1 to 0
    }
  }),
).subscribe();

import { Observable, OperatorFunction, Subscriber } from 'rxjs';

export function doOnSubscriberCountChange<TValue>(
  handleNewSubscriberCount: (newSubscriberCount: number) => void,
): OperatorFunction<TValue, TValue> {
  return (source$) => {
    let subscriberCount = 0;

    return new Observable((subscriber: Subscriber<TValue>) => {
      const subscription = source$.subscribe(subscriber);
      handleNewSubscriberCount(++subscriberCount);

      return () => {
        subscription.unsubscribe();
        handleNewSubscriberCount(--subscriberCount);
      };
    });
  };
}

-3

在组件销毁后使用this操作来取消订阅

示例:

ngOnInit() {
    interval(1000)
        .pipe(
            untilComponentDestroyed(this)       // <-- use the operator
        )
        .subscribe();
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接