如何从AsyncSubject订阅一个元素并确保仅订阅一次(消费者模式)

9
rxjs5 中,我有一个 AsyncSubject 并希望多次订阅它,但只有一个订阅者应该接收到 next() 事件。所有其他订阅者(如果他们还没有取消订阅)应立即获得 complete() 事件而不是 next()
示例:
let fired = false;
let as = new AsyncSubject();

const setFired = () => {
    if (fired == true) throw new Error("Multiple subscriptions executed");
    fired = true;
}

let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);

// note that subscription1/2 could be unsubscribed from in the future
// and still only a single subscriber should be triggered

setTimeout(() => {
    as.next(undefined);
    as.complete();
}, 500);

主题是通过设计进行多播,因此它们的所有订阅者都将接收事件。我不确定 RxJs 是否支持您正在尝试做的事情。这感觉像是一个 X-Y 问题 - 您能否给我们更多关于您要实现的确切目标的见解? - Thorn G
2个回答

1

您可以通过编写一个小类来轻松实现此操作,该类包装了初始的AsyncSubject

import {AsyncSubject, Subject, Observable, Subscription} from 'rxjs/RX'

class SingleSubscriberObservable<T> {
    private newSubscriberSubscribed = new Subject();

    constructor(private sourceObservable: Observable<T>) {}

    subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        this.newSubscriberSubscribed.next();
        return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
    }
}

您可以在示例中尝试它:

const as = new AsyncSubject();
const single = new SingleSubscriberObservable(as)

let fired = false;

function setFired(label:string){
    return ()=>{
        if(fired == true) throw new Error("Multiple subscriptions executed");
        console.log("FIRED", label);
        fired = true;
    }
}

function logDone(label: string){
    return ()=>{
       console.log(`${label} Will stop subscribing to source observable`);
    }
}

const subscription1 = single.subscribe(setFired('First'), ()=>{}, logDone('First'));
const subscription2 = single.subscribe(setFired('Second'), ()=>{}, logDone('Second'));
const subscription3 = single.subscribe(setFired('Third'), ()=>{}, logDone('Third'));

setTimeout(()=>{
    as.next(undefined);
    as.complete();
}, 500)

这里的关键是这部分:

subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
    this.newSubscriberSusbscribed.next();
    return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}

每当有人调用subscribe时,我们将会向newSubscriberSubscribed主题发出信号。
当我们订阅底层的Observable时,我们使用的是
takeUntil(this.newSubscriberSubscribed)

这意味着当下一个订阅者调用时:

this.newSubscriberSubscribed.next()

之前返回的可观察对象将会完成。

因此,当新的订阅到来时,先前的订阅将会完成,这就是您所要求的结果。

应用程序的输出将是:

First Will stop subscribing to source observable
Second Will stop subscribing to source observable
FIRED Third
Third Will stop subscribing to source observable

编辑:

如果你想让最早订阅的用户保持订阅状态,而所有后续的订阅都立即完全接收(这样在最早的订阅者仍然订阅的情况下,没有其他人可以订阅)。你可以像这样做:

class SingleSubscriberObservable<T> {
    private isSubscribed: boolean = false;

    constructor(private sourceObservable: Observable<T>) {}

    subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        if(this.isSubscribed){
            return Observable.empty().subscribe(next, error, complete);    
        }
        this.isSubscribed = true;
        var unsubscribe = this.sourceObservable.subscribe(next, error, complete);
        return new Subscription(()=>{
            unsubscribe.unsubscribe();
            this.isSubscribed = false;
        });
    }
}

我们保留一个标志 this.isSusbscribed 来跟踪当前是否有人订阅。我们还返回一个自定义的订阅,可以用来在取消订阅时将此标志设置回 false。
每当有人尝试订阅时,如果我们将他们订阅到一个立即完成的空 Observable 中,输出将如下所示:
Second Will stop subscribing to source observable
Third Will stop subscribing to source observable
FIRED First
First Will stop subscribing to source observable

this.subscriptions.next() 是未知的,因为在您的示例中没有 this.subscriptions 字段。 - Dynalon
我认为你的示例没有正确处理如果订阅者在.next()事件之前取消订阅的情况。请看我的其他评论:“不应该是第一个订阅者触发,而是最早仍在订阅的订阅者触发”。 - Dynalon
我进行了编辑,应该是 this.newSubscriberSubscribed,这是之前修改名称时留下的遗漏。 - Daniel Tabuenca
从你的问题中并不清楚那是你想要的行为。我编辑了答案,包括一个不同版本的SingleSubscriberObservable,这将使第一个订阅者成为唯一的订阅者。 - Daniel Tabuenca

0
最简单的方法是将您的AsyncSubject包装在另一个对象中,该对象处理仅调用1个订阅者的逻辑。假设您只想调用第一个订阅者,则下面的代码应该是一个很好的起点。
let as = new AsyncSubject();

const createSingleSubscriberAsyncSubject = as => {
    // define empty array for subscribers
    let subscribers = [];

    const subscribe = callback => {
        if (typeof callback !== 'function') throw new Error('callback provided is not a function');

        subscribers.push(callback);

        // return a function to unsubscribe
        const unsubscribe = () => { subscribers = subscribers.filter(cb => cb !== callback); };
        return unsubscribe;
    };

    // the main subscriber that will listen to the original AS
    const mainSubscriber = (...args) => {
        // you can change this logic to invoke the last subscriber
        if (subscribers[0]) {
            subscribers[0](...args);
        }
    };

    as.subscribe(mainSubscriber);

    return {
        subscribe,
        // expose original AS methods as needed
        next: as.next.bind(as),
        complete: as.complete.bind(as),
    };
};

// use

const singleSub = createSingleSubscriberAsyncSubject(as);

// add 3 subscribers
const unsub1 = singleSub.subscribe(() => console.log('subscriber 1'));
const unsub2 = singleSub.subscribe(() => console.log('subscriber 2'));
const unsub3 = singleSub.subscribe(() => console.log('subscriber 3'));

// remove first subscriber
unsub1();

setTimeout(() => {
    as.next(undefined);
    as.complete();
    // only 'subscriber 2' is printed
}, 500);

第一个订阅者不正确,因为订阅者可以取消订阅。应该写成“仍然订阅的最早订阅者”。 - Dynalon
现在它正在执行的操作是将订阅者按顺序添加到数组中,如果有任何人取消订阅,则会从数组中删除该订阅者,因此数组中的第一个元素始终是仍然订阅的最早的元素。如果这不是您的意思,请告诉我。 - gafi
一个小注释:您的解决方案没有将 .complete() 发送给其他订阅者,但并不是每个人都需要它,我知道如何添加它,所以我将授予您赏金。 - Dynalon
我的答案更简洁,发送 .complete() 更高效(无需搜索数组),并且更符合 RX 的惯用法。哎呀... - Daniel Tabuenca
告诉我当 .complete() 被调用时你希望其他订阅者做什么,为了完整性我会添加它。 - gafi
@dtabuenc 自从您编辑了答案以来,它受到了很好的赞赏,我认为您的答案也是一个好答案。但是必须决定在哪里授予赏金。如果可以分割,我会这样做:( 必须说,我没有时间测试您的代码,所以我只能根据代码阅读来判断。 - Dynalon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接