如何在RxJava的自定义Observable中接收观察者取消订阅的通知

20

我正在尝试将基于监听器模式的API包装成可观察对象。我的代码大致如下。

def myObservable = Observable.create({ aSubscriber ->
    val listener = {event -> 
      aSubscriber.onNext(event);                
    }
    existingEventSource.addListener(listener)
})

然而,我希望我的可观察对象在观察者调用subscription.unscribe()时立即从底层的existingEventSource中删除侦听器。我该如何实现这个目标?

1个回答

31

Subscriber抽象类实际上有一个add方法,它允许您添加Subscription,这些订阅将随着订阅者一起取消订阅。

def myObservable = Observable.create({ aSubscriber ->
    val listener = {event -> 
      aSubscriber.onNext(event);                
    }
    existingEventSource.addListener(listener)

    // Adds a lambda to be executed when the Subscriber un-subscribes from your Observable
    aSubscriber.add(Subscriptions.create(() -> existingEventSource.removeListener(listener)));
})

aSubscriber视为订阅了您的ObservableObserver;我们将其称为Subscriber。只要Subscriber仍订阅ObservableObservable就可以发出值。但是当Subscriber取消订阅时,它应该停止。但是如果我们想在Subscriber取消订阅时得到通知,我们可以注册一个Action以在其发生时运行。这就是add方法的用途。如评论中@ dwursteisen所提到的;实质上是注册一个在Subscriber取消订阅时执行的lambda表达式。

还可以在不同的调度程序上取消订阅Subscription。请参见rxandroid项目中的MainThreadSubscription,以了解如何实现它。

以下是在主线程上使用它进行取消订阅的示例

aSubscriber.add(new MainThreadSubscription() {
    @Override
    protected void onUnsubscribe() {
        existingEventSource.removeListener(listener);
    }
});

Lambda可以在不同的调度程序上运行吗? - Kirill Rakhman
@KirillRakhman 我已经编辑了回应,附上了一个示例以及你可以找到类似代码的地方。 - Miguel
unsubscribeOn 运算符适用于这个吗? - Kirill Rakhman
unsubscribeOn 是不同的。例如,如果您想在 UI 线程上取消订阅,则需要相应地实现 Scheduler 接口,或者您可以使用 rxandroid 项目中位于 此处 的 AndroidSchedulers.mainThread()。 - Miguel
有人知道如何在 RxJS 5(适用于 JavaScript)中实现相同的事情吗? - HipsterZipster
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接