您可以通过编写一个小类来轻松实现此操作,该类包装了初始的AsyncSubject
import {AsyncSubject, Subject, Observable, Subscription} from 'rxjs/RX'
class SingleSubscriberObservable<T> {
private newSubscriberSubscribed = new Subject();
constructor(private sourceObservable: Observable<T>) {}
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
this.newSubscriberSubscribed.next();
return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}
}
您可以在示例中尝试它:
const as = new AsyncSubject();
const single = new SingleSubscriberObservable(as)
let fired = false;
function setFired(label:string){
return ()=>{
if(fired == true) throw new Error("Multiple subscriptions executed");
console.log("FIRED", label);
fired = true;
}
}
function logDone(label: string){
return ()=>{
console.log(`${label} Will stop subscribing to source observable`);
}
}
const subscription1 = single.subscribe(setFired('First'), ()=>{}, logDone('First'));
const subscription2 = single.subscribe(setFired('Second'), ()=>{}, logDone('Second'));
const subscription3 = single.subscribe(setFired('Third'), ()=>{}, logDone('Third'));
setTimeout(()=>{
as.next(undefined);
as.complete();
}, 500)
这里的关键是这部分:
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
this.newSubscriberSusbscribed.next();
return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}
每当有人调用subscribe时,我们将会向newSubscriberSubscribed主题发出信号。
当我们订阅底层的Observable时,我们使用的是
takeUntil(this.newSubscriberSubscribed)
这意味着当下一个订阅者调用时:
this.newSubscriberSubscribed.next()
之前返回的可观察对象将会完成。
因此,当新的订阅到来时,先前的订阅将会完成,这就是您所要求的结果。
应用程序的输出将是:
First Will stop subscribing to source observable
Second Will stop subscribing to source observable
FIRED Third
Third Will stop subscribing to source observable
编辑:
如果你想让最早订阅的用户保持订阅状态,而所有后续的订阅都立即完全接收(这样在最早的订阅者仍然订阅的情况下,没有其他人可以订阅)。你可以像这样做:
class SingleSubscriberObservable<T> {
private isSubscribed: boolean = false;
constructor(private sourceObservable: Observable<T>) {}
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
if(this.isSubscribed){
return Observable.empty().subscribe(next, error, complete);
}
this.isSubscribed = true;
var unsubscribe = this.sourceObservable.subscribe(next, error, complete);
return new Subscription(()=>{
unsubscribe.unsubscribe();
this.isSubscribed = false;
});
}
}
我们保留一个标志
this.isSusbscribed
来跟踪当前是否有人订阅。我们还返回一个自定义的订阅,可以用来在取消订阅时将此标志设置回 false。
每当有人尝试订阅时,如果我们将他们订阅到一个立即完成的空
Observable
中,输出将如下所示:
Second Will stop subscribing to source observable
Third Will stop subscribing to source observable
FIRED First
First Will stop subscribing to source observable