为什么我不应该实现IObservable<T>?

13

在阅读与反应式扩展相关的msdn文档时,我发现有一条建议说我不应该实现IObservable,而是应该使用Observable.Create...但在我看到这个建议的时候,我的项目已经有了一个ObservableImplementation<T>类,我已经在想要将事件转换为Observables的任何地方使用它作为IObservable源。

我已经阅读了System.Reactive中的AbstractObservable<T>实现,我没有发现他们的代码和我的代码之间有什么主要区别。那么实现IObservable有什么问题吗?我可以添加自己的属性等...

出于完整性的考虑,这里是我的实现,请告诉我是否做错了什么!

public sealed class ObservableImplementation<T> : IObservable<T>
{
    class Subscription : IDisposable
    {
        private readonly Action _onDispose;
        public Subscription(Action onDispose)
        {
            _onDispose = onDispose;
        }

        public void Dispose()
        {
            _onDispose();
        }
    }


    public void Raise(T value)
    {
        _observers.ForEach(o => o.OnNext(value));
    }
    public void Completion()
    {
        _observers.ForEach(o => o.OnCompleted());
        _observers.Clear();
    }

    private readonly List<IObserver<T>> _observers = new List<IObserver<T>>();  
    public IDisposable Subscribe(IObserver<T> observer)
    {
        var subscription = new Subscription(() => _observers.Remove(observer));
        _observers.Add(observer);
        return subscription;
    }
    public bool AnyObserverPresent { get { return _observers.Any(); } }
}

更重要的是,我的代码有什么问题吗?为什么他们建议不要实现IObservable? - TDaver
该主题是一个可观察的对象,可以通过调用OnNext、OnCompleted和ObError来触发其事件。从IEnumerable的角度来看,它基本上就是List。 - Richard Szalay
有没有“AnyObserverPresent”功能?我的某个事件参数有很多数据(通过反射收集:(),我只想在有人使用它时才创建它 - 并且不能使用“Lazy”,因为它也必须在WP7上工作。 - TDaver
@PaulBetts:我在做类似于你的RxUi的东西,线程安全性有多大的关注点? - TDaver
1
@TDaver 如果你正在做与UI相关的事情,那么你已经相当确定应该从UI线程中得到信号(例如,如果你正在从事件中进行OnNext操作等)。如果有一些地方没有得到信号,你需要通过ObserveOn确保它能够到达UI线程。 - Ana Betts
显示剩余5条评论
3个回答

23

我们不建议直接实现IObservable<T>的原因有几个。

其中一个原因是缺乏对观察者语法规则的保护。例如,您的序列可能表现出在OnCompleted调用后进行OnNext调用的行为,这是无效的。Observable.Create<T>方法和ObservableBase<T>基类型会自动处理这一问题,通过在接收到终止消息时自动分离观察者。因此,即使您的代码执行错误,观察者也不会看到格式不正确的序列。

顺便说一下,这类似于C#中的迭代器。手动实现IEnumerable<T>应该是这样的:当枚举器的MoveNext返回false(类似于OnCompleted)时,后续调用不会改变其想法并开始返回true(类似于OnNext):

如果MoveNext超过集合的末尾,则枚举器位于集合中的最后一个元素之后,并且MoveNext返回false。当枚举器处于此位置时,对MoveNext的后续调用也将返回false,直到调用Reset。(来源: MSDN)

使用C# 2.0或VB 11.0中的迭代器时,这些问题已经为您处理了。这与我们的Observable.Create<T>方法和ObservableBase<T>基类型类似。

与上面的讨论相关的另一个原因是清理。在从Dispose调用中返回订阅时,观察者是否不再看到任何消息?向观察者发送终止消息时,相关订阅的Dispose逻辑是否会自动调用?这两个问题都不容易得到正确的解决方法,所以我们的基本实现会处理这些问题。

另一个原因与我们的CurrentThreadScheduler有关,确保在该调度程序上运行时,Subscribe调用可以是异步的。实质上,我们需要检查是否需要在调用Subscribe期间在当前线程上安装一个跳板。我们不希望每个人都知道这个并做正确的事情。

就像其他人所指出的那样,在您的特定情况下,您正在构建一个主题。要么只使用我们的其中一个主题,要么将其包含在您自己的类型中(例如,如果您希望发送“观察者”方可被除接收“可观察”的方以外的其他方访问)。


1
如果有人希望暴露类似事件的行为,但又希望利用让事件发布者提供一个“取消订阅”标记来识别订阅的语义优势,您会推荐什么? - supercat

10

不应该实现 IObservable<T> 的原因与通常不实现 IEnumerable<T> 的原因相同,即有人很可能已经构建了你想要的东西。在这种情况下,你基本上重新实现了 Subject<T>

编辑:至于评论中的 Lazy 问题,我会这样实现:

var lazyObservable = Observable.Create<TFoo>(subj => { /*TODO: Implement Me to load from reflection*/ })
    .Multicast(new Subject<TFoo>())   // This means it'll only calc once
    .RefCount();    // This means it won't get created until someone Subscribes

谢谢,我会尽快查看这个懒加载的东西! - TDaver

6
最近Rx团队的博客文章提到了三个原因。由于这篇文章比较长,我只复制了相关部分。 强制执行契约

Observable.Create接受一个单一委托,该委托将成为生成IObservable实现上Subscribe方法的核心实现。我们在这个委托周围做了一些聪明的包装来强制执行观察者契约,以及其他一些事情(这就是为什么你不应该自己实现接口)。

可处理对象的包装器

返回的可处理对象有一个小的包装器,用于确保观察者在从Dispose调用返回后不会再被调用,即使调度程序可能还没有到一个好的停止点。(这是另一个你不应该手动实现IObservable接口的原因。哦,顺便说一下,还有更多!)

自动完成时处理 重点在于当向下游发送OnCompleted时,应用于源订阅的自动处理行为。 (这是手动实现IObservable强烈不建议的另一个原因。使用Observable.Create时,我们会为您处理此操作。)

谢谢,虽然我认为在我的最新实现中已经考虑到了这三点 :) - TDaver

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接