在我的Angular 2应用程序中,我有许多观察者和订阅。当然,在离开页面时应该取消订阅,但我想知道是否有可能获得活动订阅的数量,仅供调试信息或在我忘记取消订阅时使用。
在rxjs中是否有这样的信息可用?
在我的Angular 2应用程序中,我有许多观察者和订阅。当然,在离开页面时应该取消订阅,但我想知道是否有可能获得活动订阅的数量,仅供调试信息或在我忘记取消订阅时使用。
在rxjs中是否有这样的信息可用?
可能有点晚了,但您可以利用rxjs-spy
的帮助。
这个解决方案与所提出的方案等效,并且在我看来更易于维护。
我通常将其全局启用在main.ts中作为一种“启动并忘记”的策略。您只需执行以下操作:
install rxjs-spy via your package manager
import in main.ts the reference to the create function: import { create } from 'rxjs-spy';
initialize the rxjs-spy in debug builds right after the angular initialization snippet:
if (environment.production) {
enableProdMode();
}
else {
//we enable RXjs Spy on non production bulds only
const spy = create();
// we call show for two purposes: first is to log to the console an empty snapshot so we can see that everything is working as expected, then to suppress unused variable usage (the latter is a convention on mine)
spy.show();
}
give your observables a name:
import { tag } from 'rxjs-spy/operators';
...
// This is a sample method which asks for a "Product" entity. Product and this.http is omitted as the focus is on tagging the observable
public getProductById(productId: number): Observable<Product> {
let params = new HttpParams()
.append('productId', productId.toString())
;
// we tag the returned observable with the name 'getProductById' (this is a convention on mine, you can choose whatsoever name)
return this.http.get<Product>(this.baseUrl + "api/product", { params: params }).pipe(tag("getProductById"));
}
when you need to look at rxjs state, you can simply open the console window and use rxSpy.show()
to have the current snapshot
下面的实用函数可能有助于...
function subscriberCount<T>(sourceObservable: Observable<T>, description: string) {
let counter = 0;
return new Observable((subscriber: Subscriber<T>) => {
const subscription = sourceObservable.subscribe(subscriber);
counter++;
console.log(`${description} subscriptions: ${counter}`);
return () => {
subscription.unsubscribe();
counter--;
console.log(`${description} subscriptions: ${counter}`);
}
});
}
像这样使用:
const timer$ = subscriberCount(Observable.timer(1000), 'Timer');
当订阅者数量发生变化时,控制台将记录日志
Observable.create
已被弃用,您是否可以更新您的示例? - MeIr实际上有一种更简单的方法:
你可以将你的可观察对象转换为 "BehaviorSubject",它们有一个名为 "observers" 的属性,可以给出订阅数量。
之前:
let myObservable = somethingObservable;
// Do something complicated to get the subscription count
之后:
let myObservable = new BehaviorSubject();
let count = myObservable.observers.length
myBehaviorSubject.observed
const {
Observable
} = Rx;
let sourceObservable = Observable
.create((observer) => {
let count = 0;
let interval = setInterval(() => {
observer.next(count++)
}, 700);
setTimeout(() => {
clearInterval(interval);
observer.complete();
}, 5500);
return () => {
clearInterval(interval);
console.log('######## Source observable unsubscribed');
}
})
.do((x) => console.log('#### Source emits: ' + x));
let subject = sourceObservable
.share()
//.do((x) => console.log('#### Subject emits: ' + x))
;
let pageOneObserver;
let pageTwoObserver;
let pageThreeObserver;
setTimeout(() => {
console.log('pageOneObserver will subscribe');
pageOneObserver = subject.subscribe({
next: (x) => {
console.log('pageOneObserver gets: ' + x);
},
complete: () => {
console.log('pageOneObserver: complete');
}
});
}, 1000);
setTimeout(() => {
console.log('pageTwoObserver will subscribe');
pageTwoObserver = subject.subscribe({
next: (x) => {
console.log('pageTwoObserver gets: ' + x);
},
complete: () => {
console.log('pageTwoObserver: complete');
}
});
}, 4000);
setTimeout(() => {
console.log('pageOneObserver will unsubscribe');
pageOneObserver.unsubscribe();
}, 7000);
setTimeout(() => {
console.log('pageTwoObserver will unsubscribe');
pageTwoObserver.unsubscribe();
}, 10000);
setTimeout(() => {
console.log('pageThreeObserver will subscribe');
pageThreeObserver = subject.subscribe({
next: (x) => {
console.log('pageThreeObserver gets: ' + x);
},
complete: () => {
console.log('pageThreeObserver: complete');
}
});
}, 13000);
setTimeout(() => {
console.log('pageThreeObserver will unsubscribe');
pageThreeObserver.unsubscribe();
}, 16000);
<script src="https://unpkg.com/rxjs@5.1.1/bundles/Rx.min.js"></script>
sourceObservable.share();
// is the same as
sourceObservable.publish().refCount();
sourceObservable.publish().refCount();
// is the same as
sourceObservable.multicast(new Rx.Subject()).refCount();
sourceObservable.publishReplay().refCount();
// is the same as
sourceObservable.multicast(new Rx.ReplaySubject(1)).refCount();
sourceObservable.publishBehavior().refCount();
// is the same as
sourceObservable.multicast(new Rx.BehaviorSubject(0)).refCount();
sourceObservable.publishLast().refCount();
// is the same as
sourceObservable.multicast(new Rx.AsyncSubject()).refCount();
sourceObservable.share();
还内置了主题工厂,这意味着当源可观察对象在某一点完成时,我们必须创建一个新的源可观察对象实例,但只需使用您选择的主题的新实例即可完成。如果要使用Rx.Subject()之外的其他主题类型,并希望使您的可观察订阅真正可重用,则必须使用主题工厂(这只是返回要使用的任何主题的新实例的函数),如下所示:
const {
Observable
} = Rx;
let sourceObservable = Observable
.create((observer) => {
let count = 0;
let interval = setInterval(() => {
observer.next(count++)
}, 700);
setTimeout(() => {
clearInterval(interval);
observer.complete();
}, 5500);
return () => {
clearInterval(interval);
console.log('######## Source observable unsubscribed');
}
})
.do((x) => console.log('#### Source emits: ' + x));
/* You could return whatever subject instance you like here */
let subjectFactory = () => new Rx.ReplaySubject(1);
let subject = sourceObservable
.multicast(subjectFactory)
.refCount();
//.do((x) => console.log('#### Subject emits: ' + x))
;
let pageOneObserver;
let pageTwoObserver;
let pageThreeObserver;
setTimeout(() => {
console.log('pageOneObserver will subscribe');
pageOneObserver = subject.subscribe({
next: (x) => {
console.log('pageOneObserver gets: ' + x);
},
complete: () => {
console.log('pageOneObserver: complete');
}
});
}, 1000);
setTimeout(() => {
console.log('pageTwoObserver will subscribe');
pageTwoObserver = subject.subscribe({
next: (x) => {
console.log('pageTwoObserver gets: ' + x);
},
complete: () => {
console.log('pageTwoObserver: complete');
}
});
}, 4000);
setTimeout(() => {
console.log('pageOneObserver will unsubscribe');
pageOneObserver.unsubscribe();
}, 7000);
setTimeout(() => {
console.log('pageTwoObserver will unsubscribe');
pageTwoObserver.unsubscribe();
}, 10000);
setTimeout(() => {
console.log('pageThreeObserver will subscribe');
pageThreeObserver = subject.subscribe({
next: (x) => {
console.log('pageThreeObserver gets: ' + x);
},
complete: () => {
console.log('pageThreeObserver: complete');
}
});
}, 13000);
setTimeout(() => {
console.log('pageThreeObserver will unsubscribe');
pageThreeObserver.unsubscribe();
}, 16000);
<script src="https://unpkg.com/rxjs@5.1.1/bundles/Rx.min.js"></script>
如果还有不清楚的地方,请随时提问。
您可以使用Subscription
类,它带有add
、remove
和unsubscribe
方法,以便您管理订阅。 _subscriptions
属性的长度基本上告诉您有多少个活动订阅。这种方法在Angular的onDestroy
中非常适用于可观察的取消订阅。
import { interval, Subscription } from "rxjs";
const subs = new Subscription();
const sub = interval(1000).subscribe();
subs.add(sub);
console.log(sub);
someValue$.pipe(
doOnSubscriberCountChange((newSubscriberCount) => {
console.log(newSubscriberCount);
if (newSubscriberCount === 1) {
// Do something when subscriber count changes from 0 to 1
} else if (newSubscriberCount === 0) {
// Do something when subscriber count changes from 1 to 0
}
}),
).subscribe();
import { Observable, OperatorFunction, Subscriber } from 'rxjs';
export function doOnSubscriberCountChange<TValue>(
handleNewSubscriberCount: (newSubscriberCount: number) => void,
): OperatorFunction<TValue, TValue> {
return (source$) => {
let subscriberCount = 0;
return new Observable((subscriber: Subscriber<TValue>) => {
const subscription = source$.subscribe(subscriber);
handleNewSubscriberCount(++subscriberCount);
return () => {
subscription.unsubscribe();
handleNewSubscriberCount(--subscriberCount);
};
});
};
}
在组件销毁后使用this操作来取消订阅
示例:
ngOnInit() {
interval(1000)
.pipe(
untilComponentDestroyed(this) // <-- use the operator
)
.subscribe();
}
async
管道而不是订阅可观察对象,你就不需要取消订阅,因为框架会为你处理。 - Brocco