使用IObservable代替事件

11

最近我一直在了解 IObservable。到目前为止,我已经查看了各种 Stack Overflow 问题,并观看了一些视频来了解它们的功能。整个“推送”机制我认为非常棒,但我仍然在尝试弄清楚每件事情的具体含义。从我的阅读中,我猜想 IObservable 在某种程度上是可以“被观察”的东西,而 IObservers 则是“观察者”。

现在我要尝试在我的应用程序中实现此功能。在开始之前,我有几件事情需要梳理清楚。我看到 IObservable 是 IEnumerable 的相反,但是在我的特定实例中,我实在找不到任何可以将其纳入我的应用程序的地方。

目前,我大量使用事件,以至于我发现“管道”正在变得难以管理。我认为,IObservable 可以帮助我解决这个问题。

考虑以下设计,这是我在我的应用程序中包装 I/O 的方式(FYI,我通常必须处理字符串):

我有一个名为 IDataIO 的基本接口:

public interface IDataIO
{
  event OnDataReceived;
  event OnTimeout:
  event OnTransmit;
}

目前,我有三个实现该接口的类,每个类都以某种方式使用异步方法调用,从而引入了某种类型的多线程处理:

public class SerialIO : IDataIO;
public class UdpIO : IDataIO;
public class TcpIO : IDataIO;

每个类都有一个单独的实例,被包装到我的最终类IO中(也实现了IDataIO - 符合我的策略模式):

public class IO : IDataIO
{
  public SerialIO Serial;
  public UdpIO Udp;
  public TcpIO Tcp;
}
我已经使用策略模式来封装这三个类,所以在运行时更改不同的 IDataIO 实例时对最终用户是“不可见”的。你可以想象一下,这导致了背景中相当多的“事件管道”。
那么,在我的情况下,我如何利用“推送”通知?我想要将数据简单地推送给任何感兴趣的人,而不是订阅事件(例如 DataReceived 等)。我有点不确定从哪里开始。我仍在尝试使用 Subject 的想法/泛型类,以及它的各种形式(ReplaySubject/AsynSubject/BehaviourSubject)。请有经验的人给我指点一下(也许参考我的设计)?或者这是否根本不适合使用 IObservable
PS. 请随意纠正我任何“误解” :)
3个回答

9

Observables非常适合表示数据流,因此您的事件可以很好地模拟observable模式,例如IObservable<byte>或IObservable<byte[]>。您还可以获得方便的OnError和OnComplete。

在实现方面,针对您的具体情况很难说,但我们经常使用Subject<T>作为底层源,并调用OnNext来推送数据。可能是这样的:

// Using a subject is probably the easiest way to push data to an Observable
// It wraps up both IObservable and IObserver so you almost never use IObserver directly
private readonly Subject<byte> subject = new Subject<byte>();

private void OnPort_DataReceived(object sender, EventArgs e)
{
    // This pushes the data to the IObserver, which is probably just a wrapper
    // around your subscribe delegate is you're using the Rx extensions
    this.subject.OnNext(port.Data); // pseudo code 
}

您可以通过属性公开主题:
public IObservable<byte> DataObservable
{
    get { return this.subject; } // Or this.subject.AsObservable();
}

你可以用 IObservable<T> 替换 IDataIO 上的 DataReceived 事件,并且让每个策略类以他们需要的方式处理数据并推送到 Subject<T> 中。
在另一方面,无论谁订阅 Observable 都能像使用事件一样(只需使用一个 Action<byte[]>)来处理它,或者你可以在流上执行一些非常有用的工作,例如 SelectWhereBuffer 等等。
private IDataIO dataIo = new ...

private void SubscribeToData()
{ 
    dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes);
}

private void On16Bytes(IList<byte> bytes)
{
    // do stuff
}
ReplaySubject/ConnectableObservable在你知道订阅者会迟到但仍需要赶上所有事件时非常有用。源缓存它推送的所有内容并为每个订阅者重播所有内容。只有你可以确定这是否是你实际需要的行为(但要小心,因为它会缓存所有内容,这显然会增加内存使用量)。
当我学习Rx时,我发现http://leecampbell.blogspot.co.uk/关于Rx的博客系列对理解理论非常有帮助(这些文章现在有点过时,API已经改变了,所以要注意)。

嗨,RichK,你能详细说明一下Subject属性吗?这个属性是如何声明的?对于这个类的用户来说,他们应该如何“订阅”IObservable DataReceived - Simon
@Simon,我做了一些编辑,请告诉我你是否还不确定 :) - RichK
抱歉,代码还不太对 - 我忘记给那个属性命名了![已修复] - RichK
在我开始编码之前,还有一件事情需要注意的是,我主要处理字符串信息,它们不需要缓冲字节流。它们是完整的,我只需要处理它们。在这种情况下,观察者模式仍然适用吗? - Simon
1
听起来你需要学习一下关于可观察对象和响应式模式的知识。响应式是一种奇特的编程方式,它有什么用处?在什么情况下使用比较合适?我强烈推荐这个系列文章:http://www.introtorx.com/content/v1.0.10621.0/01_WhyRx.html#WhyRx - Gusdor
显示剩余2条评论

6

这绝对是使用可观察对象的理想案例。 IO 类可能会看到最大的改进。 首先,让我们改变接口以使用可观察对象,并看看组合类变得多么简单。

public interface IDataIO
{
    //you will have to fill in the types here.  Either the event args
    //the events provide now or byte[] or something relevant would be good.
    IObservable<???> DataReceived;
    IObservable<???> Timeout;
    IObservable<???> Transmit;
}

public class IO : IDataIO
{
    public SerialIO Serial;
    public UdpIO Udp;
    public TcpIO Tcp;

    public IObservable<???> DataReceived
    {
        get 
        {
            return Observable.Merge(Serial.DataReceived,
                                    Udp.DataReceived,
                                    Tcp.DataReceived);
        }
    }

    //similarly for other two observables
}

顺便提一下:你可能会注意到我更改了接口成员名称。在.NET中,事件通常被命名为<event name>,并且引发它们的函数被称为On<event name>

对于生产类,您有几个选项取决于实际来源。假设您正在使用.NET SerialPort类中的SerialIO,并且DataReceived返回一个IObservable<byte[]>。由于SerialPort已经有一个数据接收事件,您可以直接使用它来创建所需的可观察对象。

public class SerialIO : IDataIO
{
    private SerialPort _port;

    public IObservable<byte[]> DataRecived
    {
        get
        {
            return Observable.FromEventPattern<SerialDataReceivedEventHandler,
                                               SerialDataReceivedEventArgs>(
                        h => _port.DataReceived += h,
                        h => _port.DataReceived -= h)
                   .Where(ep => ep.EventArgs.EventType == SerialData.Chars)
                   .Select(ep =>
                           {
                              byte[] buffer = new byte[_port.BytesToRead];
                              _port.Read(buffer, 0, buffer.Length);
                              return buffer;
                           });
        }
    }
}

如果您没有现有的事件源,您可能需要像RichK建议的那样使用主题。 他的答案很好地涵盖了这种用法模式,因此我不会在此重复。

您没有展示如何使用此接口,但根据用例,将这些类上的其他函数返回IObservable本身并完全放弃这些“事件”可能更有意义。 使用基于事件的异步模式,您必须将事件与调用触发工作的函数分开,但是使用可观察对象,您可以从函数中返回它们,以便更明显地订阅您要订阅的内容。 这种方法还允许从每个调用返回的可观察对象发送OnErrorOnCompleted消息以表示操作的结束。 基于您使用的合并类,我不认为这在这种特定情况下有用,但这是需要记住的事情。


谢谢,这里有一些很好的信息。关于Merge()语句 - 它将一系列可观测对象合并为一个 - 在我的应用中,我只会同时使用其中之一(串行/UDP/TCP),并允许用户在不同接口之间切换(因此我在事件处理方面存在困境)。在这里推荐合并这些可观测对象吗?感谢提供异步串行事件的链接 :) - Simon
@Simon 当你合并这些包装类的可观察对象时,可以应用一个Where来检查“CurrentSource”或类似属性以过滤掉不需要的消息。请注意,这里的“包装类”指的是被RxJS等响应式编程库封装的类。 - Gideon Engelberth
1
@Simon 或者你可以(最好)停止在其他可观察对象上生成消息,这样 Merge 一次只接收一个。如果是这种情况,使用 Switch 可能更好。 - yamen

0

使用 IObservable 而不是事件

如果只对属性更改感兴趣,NuGet 包 rxx 可以提供帮助:

IObservable<string> obs=Observable2.FromPropertyChangedPattern(() => obj.Name)

(连同许多其他方法一起)

或者如果事件阻止属性更改/希望避免实现INotifyPropertyChanged

class ObserveEvent_Simple
{
    public static event EventHandler SimpleEvent;
    static void Main()
    {          
       IObservable<string> eventAsObservable = Observable.FromEventPattern(
            ev => SimpleEvent += ev,
            ev => SimpleEvent -= ev);
    }
}

类似于 u/Gideon Engelberth 来自http://rxwiki.wikidot.com/101samples#toc6

被覆盖的 https://rehansaeed.com/reactive-extensions-part2-wrapping-events/


此外,这篇CodeProject文章专门介绍了将事件转换为响应式事件的方法。

https://www.codeproject.com/Tips/1078183/Weak-events-in-NET-using-Reactive-Extensions-Rx

同时还涉及弱订阅


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接