从零开始实现IObservable<T>

39
反应式扩展库提供了许多辅助方法,用于将现有事件和异步操作转换为可观察对象,但是如何从头开始实现IObservable<T>?IEnumerable具有可爱的yield关键字,使其非常简单易行。那么,正确的实现IObservable<T>的方法是什么?我需要担心线程安全吗?我知道有支持在特定同步上下文中回调的功能,但这是否是我作为IObservable<T>作者需要担心的问题,还是内置的?更新:这是Brian F#解决方案的C#版本。
using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

编辑: 如果调用Dispose两次,请不要抛出ObjectDisposedException。


1
Wes Dyer现在在Channel9上发布了一段视频,讨论这些接口的契约。 - Benjol
1
(30秒后... http://channel9.msdn.com/posts/J.Van.Gogh/Reactive-Extensions-API-in-depth-Contract/)请参阅深入了解反应式扩展API的合约。 - Benjol
很酷 - 一定会看的 :) - Jesper Larsen-Ledet
当调用Dispose()两次时,不应该抛出ObjectDisposed异常。而是在已经调用dispose后再调用其他方法时,应该抛出ObjectDisposed异常。 - John Gietzen
@JohnGietzen 你说得完全正确。我已经编辑了代码以反映这一点。 - Jesper Larsen-Ledet
5个回答

11

官方文档不建议用户自己实现IObservable。相反,用户应该使用工厂方法Observable.Create

如果可能的话,请通过组合现有的操作符来实现新的操作符。否则,请使用Observable.Create来实现自定义操作符。

Observable.Create恰好是Reactive内部类AnonymousObservable的一个简单包装:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

我不知道为什么他们没有公开他们的实现,但是无论如何。


正确。不要自己实现IObservable<T>IObserver<T> - Lee Campbell
嗨,李。我喜欢你关于RX的书籍形式,比官方文档更好的指南。 - Colonel Panic
1
欢呼。由于 Rx 现在已经开源,我希望能够帮助团队更新官方的代码/文档。 - Lee Campbell

9

说实话,我不确定这一切是否“正确”,但根据我的经验,它感觉相当不错。 这是F#代码,但希望您能感受到其味道。 它允许您“new up”一个源对象,然后调用Next/Completed/Error,并管理订阅并在源或客户端出现问题时尝试进行断言。

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

对于这里的优点和缺点,我会很感兴趣;我还没有机会查看devlabs的所有新Rx内容...

我的经验表明:

  • 订阅者不应该从订阅中抛出异常。当订阅者抛出异常时,观察者无法做出任何合理的反应。(这类似于事件。)最有可能的情况是异常将冒泡到顶级捕获处理程序或导致应用程序崩溃。
  • 源应该是“逻辑上单线程”的。我认为编写可以响应并发OnNext调用的客户端可能更加困难;即使每个单独的调用来自不同的线程,避免并发调用也是有帮助的。
  • 拥有一个强制实施一些“契约”的基础/助手类肯定是有用的。

我非常好奇人们是否可以展示更多这方面的具体建议。


1
谢谢,我尝试使用C#创建类似的东西,并最终使用了F# Map集合来避免在枚举期间锁定。另一个选择是使用Eric Lippert的不可变AVLTree之类的东西。我已经让自己相信,观察者有责任确保事件在适当的上下文中接收,而可观察对象应该始终在同一线程上引发事件(正如您所写的那样)。 - Jesper Larsen-Ledet

7

是的,yield关键字非常好用;也许会有类似于IObservable(OfT)的东西出现?[编辑:在Eric Meijer的PDC '09 talk中,他表示“是的,请看这个空间”来生成可观察序列的声明性yield。]

如果想要接近的内容(而不是自己编写),请查看“(not yet) 101 Rx Samples”维基的底部链接,团队建议使用Subject(T)类作为实现IObservable(OfT) 的“后端”。以下是他们的示例:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}

在我看来,当你想要生成自己的可观察对象时,Subject 绝对是正确的选择。 - Lukas Cenovsky
2
只是一点小建议,这里更好的选择是使用 AsyncSubject<T> ,因为它可以为未来的订阅者保留最后一个值。在你的例子中,在实际支付事件发生之前必须先进行订阅。 - Nappy
@Nappy:我不知道AsyncSubject<T>,谢谢你提到它。 - Dan Abramov
@Nappy,你是不是想用 BehaviorSubject<T> 而不是 AsyncSubject<T> - Angshuman Agarwal

2
  1. 打开Reflector并查看。

  2. 观看一些C9视频- this 这个视频展示了如何“派生”选择器“combinator”

  3. 秘密是创建AnonymousObservable、AnonymousObserver和AnonymousDisposable类,它们只是解决你无法实例化接口的问题。它们不包含任何实现,因为你可以通过Actions和Funcs传递实现。

例如:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

我会让你解决剩下的部分……这是一个非常好的理解练习。
这里有一个相关问题的很棒的小讨论线程

1
谢谢,但并没有真正帮到我。我已经查看过反编译工具和大部分 C9 的视频了。反编译工具只展示了实际实现,很难从中推导出有关线程等规则。此外,你所谓的“秘密”只是把正确实现的责任从实际的可观察类转移到了提供的 Func 上——它并没有揭示实现该 Func 的规则。因此,你什么也没告诉我,除了让我自己去找出其余部分 :) - Jesper Larsen-Ledet
1
知道了。老实说,我到目前为止大部分的努力都是试图编写所谓的“组合器”,而不是实际的源代码。你可以从这里对我的问题的回答中获取一些指导方针(目前获得“官方”答案的最佳地点):http://social.msdn.microsoft.com/Forums/en-US/rx/thread/79402dd3-009a-46db-9b55-06482e8cad0e - Benjol

2

关于这个实现,我只想提出一个评论:

在 .net fw 4 中引入并发集合后,最好使用ConcurrentDictioary而不是简单的Dictionary。

它可以避免对集合进行锁定处理。

adi.


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接