为什么在.NET Reactive Extensions中不推荐使用Subjects?

130
我目前正在熟悉用于.NET的Reactive Extensions框架,并通过各种介绍资源(主要是http://www.introtorx.com)逐步掌握它。
我们的应用程序涉及多个硬件接口,用于检测网络帧,这些将成为我的IObservables,然后我有各种组件来消耗这些帧或对数据进行某种转换并生成新类型的帧。还会有其他组件需要显示每n帧,例如。 我相信Rx将对我们的应用程序有用,但是我在IObserver接口的实现细节上遇到了困难。
我阅读过的大多数资源都说我不应该自己实现IObservable接口,而应使用提供的函数或类之一。 从我的研究中得知,创建一个Subject<IBaseFrame>将为我提供所需的内容,我将拥有一个单线程,用于从硬件接口读取数据,然后调用我的Subject<IBaseFrame>实例的OnNext函数。然后,不同的IObserver组件将从该Subject接收其通知。

我感到困惑的是这个教程附录中给出的建议,其中说:

避免使用subject类型。Rx实际上是一种函数式编程范例。使用subjects意味着我们现在正在管理状态,这可能会导致突变。同时处理突变状态和异步编程非常难以正确地完成。此外,许多运算符(扩展方法)已经被精心编写,以确保维护订阅和序列的正确和一致的生命周期;当您引入subjects时,您可能会破坏这个过程。未来的版本也可能会看到显著的性能下降,如果您明确使用subjects。

我的应用程序非常重视性能,我显然会在进入生产代码之前测试使用Rx模式的性能;但我担心使用Subject类违背了Rx框架的精神,并且将来的框架版本会影响性能。

有没有更好的方法做我想做的事情?硬件轮询线程将持续运行,无论是否有观察者(否则HW缓冲区将会备份),因此这是一个非常热门的序列。然后我需要将收到的帧传递给多个观察者。

任何建议都将不胜感激。


1
这真的帮助了我对这个主题的理解,我正在整理如何在我的应用程序中使用它。我知道它们是正确的 - 我有一系列非常推动导向的组件管道,我需要在UI线程上进行各种过滤和调用以在GUI中显示,以及缓冲最后接收到的帧等等 - 我只需要确保第一次就做对! - Anthony
5个回答

89

好的,如果我们忽略我教条主义的方式并完全忽略"主题是好/坏"。让我们看看问题空间。

我打赌你要么有1个系统样式需要整合到其中之一。

  1. 当消息到达时,系统会触发事件或回调
  2. 您需要轮询系统以查看是否有任何消息要处理

对于选项1,很容易,我们只需使用适当的FromEvent方法包装它,然后就完成了。去酒吧吧!

对于选项2,现在我们需要考虑如何轮询它以及如何高效地执行此操作。还有当我们获得值时,如何发布它?

我想你会想要一个专用线程来进行轮询。您不希望其他编码人员占用线程池/任务池,使您处于线程池饥饿状态。或者您不想烦恼上下文切换(我猜)。因此假设我们有自己的线程,我们可能会有某种While / Sleep循环坐在那里进行轮询。当检查发现一些消息时,我们会发布它们。嗯,所有这些听起来都非常适合Observable.Create。现在我们可能不能使用While循环,因为那样就无法返回一个可处置对象以允许取消。幸运的是,您已经阅读了整本书,因此精通递归调度!

我想这样做可能有效。#未测试

public class MessageListener
{
    private readonly IObservable<IMessage> _messages;
    private readonly IScheduler _scheduler;

    public MessageListener()
    {
        _scheduler = new EventLoopScheduler();

        var messages = ListenToMessages()
                                    .SubscribeOn(_scheduler)
                                    .Publish();

        _messages = messages;
        messages.Connect();
    }

    public IObservable<IMessage> Messages
    {
        get {return _messages;}
    }

    private IObservable<IMessage> ListenToMessages()
    {
        return Observable.Create<IMessage>(o=>
        {
                return _scheduler.Schedule(recurse=>
                {
                    try
                    {           
                        var messages = GetMessages();
                        foreach (var msg in messages)
                        {
                            o.OnNext(msg);
                        }   
                        recurse();
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }                   
                });
        });
    }

    private IEnumerable<IMessage> GetMessages()
    {
         //Do some work here that gets messages from a queue, 
         // file system, database or other system that cant push 
         // new data at us.
         // 
         //This may return an empty result when no new data is found.
    }
}

我不喜欢使用Subject的原因是通常是因为开发人员对问题没有清晰的设计。开发者会添加一个Subject,随意操作,然后让技术支持猜测出问题出在哪里。当你使用Create/Generate等方法时,你可以将效果局限在序列中。你可以在一个方法中看到所有内容,并且知道没有其他人引入了危险的副作用。如果我看到有一个Subject字段,我现在必须去查找类中它被使用的所有位置。如果某些人公开地暴露了Subject,则所有的赌注都是无效的,谁知道这个序列是如何被使用的! 异步/并发/Rx很难。你不需要通过允许副作用和因果编程来使它变得更加困难。


13
我现在才看到这个答案,但我觉得我应该指出我绝不会考虑暴露Subject接口!我正在使用它在一个封闭类中提供IObservable<>实现(同时暴露IObservable<>)。我绝对能理解为什么暴露Subject<>接口会是一件坏事™。 - Anthony
2
对于您的个人项目@jeromerg,这可能是可以接受的。然而根据我的经验,开发者在处理WPF、MVVM、单元测试GUI设计时会遇到困难,加入Rx会使事情变得更加复杂。我尝试过使用BehaviorSubject作为属性模式,但我发现如果我们使用标准的INPC属性,然后使用一个简单的扩展方法将其转换为IObservable,那么它会更容易被其他人采用。此外,您需要自定义WPF绑定来与您的行为主题一起工作。现在您的团队不得不学习WPF、MVVM、Rx和您的新框架。 - Lee Campbell
你可以查看我的 RxCookbook 示例,展示了使用扩展方法使 INPC 可观察 - https://github.com/LeeCampbell/RxCookbook/blob/master/Model/PropertyChange.md,并且类似的方法可以使 ObservableCollections 以 Rx 方式可观察 - https://github.com/LeeCampbell/RxCookbook/blob/master/Model/CollectionChange.md。 - Lee Campbell
2
@LeeCampbell,以你的代码示例来说,正常的方式是由系统构建MessageListener(你可能以某种方式注册类名),然后告诉你系统将调用OnCreate()和OnGoodbye(),并在生成消息时调用message1()、message2()和message3()。看起来,messageX[123]会在主题上调用OnNext,但有更好的方法吗? - James Moore
1
@JamesMoore,这些东西用具体的例子来解释会更容易。如果您知道一个使用Rx和Subjects的开源Android应用程序,那么也许我可以找时间看看是否能提供更好的方法。我确实明白站在高台上说Subjects不好并不是很有帮助。但我认为像IntroToRx、RxCookbook和ReactiveTrader这样的东西都提供了各种使用Rx的示例。 - Lee Campbell
显示剩余9条评论

44
一般情况下应避免使用Subject,但是对于你在这里要做的事情,我认为它们非常有效。当我在Rx教程中遇到“避免使用Subjects”消息时,我提出了一个相似的问题
引用Dave Sexton(来自Rxx):

“Subjects是Rx的有状态组件。当您需要将事件视为字段或本地变量时,它们非常有用。”

我倾向于将它们用作进入Rx的入口点。因此,如果我有一些需要说“发生了某事”的代码(就像你所拥有的那样),我会使用Subject并调用OnNext。然后将其公开为IObservable供其他人订阅(您可以在主题上使用AsObservable()以确保没有人可以将其转换为Subject并搞乱事情)。
你也可以使用.NET事件并使用FromEventPattern来实现此操作,但是如果我只是将事件转换为IObservable,那么我不认为使用事件而不是Subject有任何好处(这可能意味着我在这里忽略了某些内容)。
但是,你应该非常避免订阅带有SubjectIObservable,即不要将Subject传递到IObservable.Subscribe方法中。

为什么你需要状态?如我的回答所示,如果将问题分解成独立的部分,你实际上并不需要管理状态。在这种情况下,不应该使用主题。 - casperOne
9
@casperOne,除了Subject<T>或事件(它们都有要调用的对象集合,观察者或事件处理程序)之外,您不需要状态。如果添加事件的唯一原因是将其包装在FromEventPattern中,则我只是更喜欢使用Subject。除了异常计划的更改之外,这种避免使用Subject的方式没有任何好处。再次强调,我可能会忽略其他一些优于Subject的方面。提到状态只是引用的一部分,不过保留此部分似乎更清晰? - Wilka
@casperOne - 但是你也不应该只是为了用FromEventPattern来包装一个事件。这显然是一个可怕的想法。 - James Moore
3
我已经在这篇博客文章中更深入地解释了我的引言。 - Dave Sexton
我倾向于将它们用作进入 Rx 的入口点。 这对我来说非常准确。我有一个情况,其中有一个 API 被调用时会生成事件,我希望将其通过响应式处理管道传递。对我来说,Subject 是答案,因为在 RxJava 中似乎不存在 FromEventPattern。 - scorpiodawg
@DaveSexton的博客文章链接已更新 - https://www.davesexton.com/blog/post/To-Use-Subject-Or-Not-To-Use-Subject.aspx,非常优秀。 - Bondolin

36

经常情况下,当你在管理一个主题时,实际上你只是重新实现了Rx中已经存在的功能,而且可能没有那么健壮、简单和可扩展。

当你试图将一些异步数据流适应于Rx(或从当前不是异步的数据流创建异步数据流)时,最常见的情况通常有:

  • 数据源来自事件:正如Lee所说,这是最简单的情况:使用FromEvent并前往酒吧。

  • 数据源来自同步操作,你需要轮询更新(例如WebService或数据库调用):在这种情况下,您可以使用Lee建议的方法,或者对于简单的情况,您可以使用类似于Observable.Interval.Select(_ => <db fetch>)的东西。您可能需要使用DistinctUntilChanged()防止在源数据未更改时发布更新。

  • 数据源来自某种调用您回调的异步API:在这种情况下,使用Observable.Create将您的回调钩入调用Observer.OnNext/OnError/OnComplete。

  • 数据源是一次阻塞调用,直到有新数据可用(例如一些同步套接字读取操作):在这种情况下,您可以使用Observable.Create将从套接字读取数据并发布到Observer.OnNext的命令式代码进行封装。这可能类似于使用Subject处理的内容。

使用Observable.Create与创建管理Subject的类相当于使用yield关键字与创建实现IEnumerator的整个类相当。当然,您可以编写一个IEnumerator来像yield代码一样干净、好用,但哪一个更好地封装和感觉更整洁?对于Observable.Create与管理Subjects同样适用。

Observable.Create提供了一种干净的惰性设置和清理方法。如果用一个包装Subject的类来实现,你需要一种启动方法...你如何知道何时调用它?或者你总是在没有人听的情况下启动它吗?完成后,如何停止从套接字/轮询数据库等读取数据?你必须有某种Stop方法,而且你仍然必须访问不仅订阅的IObservable,而且还要访问创建Subject的类。

使用Observable.Create,这一切都包含在一个地方。Observable.Create的主体直到有人订阅时才会运行,因此,如果没有人订阅,你永远不会使用你的资源。而Observable.Create返回一个Disposable,可以干净地关闭您的资源/回调等-当Observer取消订阅时会调用该方法。生成Observable所使用的资源的生命周期与Observable本身的生命周期紧密相关。


1
Observable.Create的解释非常清晰。谢谢! - Evan Moran
1
我仍然有使用主题的情况,其中代理对象公开可观察对象(例如它只是一个可更改的属性)。不同的组件将调用代理来告知该属性何时更改(通过方法调用),并且该方法会执行OnNext。消费者进行订阅。我认为在这种情况下我会使用BehaviorSubject,这合适吗? - Frank Schwieterman
1
这取决于情况。良好的 Rx 设计倾向于将系统转变为异步/反应式架构。将反应式代码的小组件与命令式设计的系统清晰地集成在一起可能很困难。临时解决方案是使用 Subjects 将命令式操作(函数调用、属性设置)转换为可观察事件。然后你就会得到一些反应式代码的小片段,但没有真正的“啊哈!”时刻。改变设计以建模数据流并对其做出反应通常会带来更好的设计,但这是一种普遍的变化,需要思维方式的转变和团队的支持。 - Niall Connaughton
1
作为一个 Rx 初学者,我想说:通过使用 Subjects,你可以在一个成熟的命令式应用程序中进入 Rx 的世界,并逐渐进行转换。这样你就可以获得第一手经验... 然后再将代码改为一开始就应该是的样子(哈哈)。但是,作为一个入门者,我认为使用 Subjects 是值得尝试的。 - Robetto

9
引用块文本已经解释了为什么你不应该使用Subject<T>,简单来说,你正在将观察者和可观察者的功能结合在一起,并在它们之间注入某种状态(无论是封装还是扩展)。
这就是你遇到麻烦的地方;这些职责应该是彼此分离和明确的。
话虽如此,在你的具体情况下,我建议你将你的关注点分成更小的部分。
首先,你有一个始终监视硬件信号以引发通知的热线程。你通常会如何做呢?Events。所以让我们从那里开始。
让我们定义你的事件将触发的EventArgs
// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
    public BaseFrameEventArgs(IBaseFrame baseFrame)
    {
        // Validate parameters.
        if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");

        // Set values.
        BaseFrame = baseFrame;
    }

    // Poor man's immutability.
    public IBaseFrame BaseFrame { get; private set; }
}

现在,将触发事件的类。请注意,这可以是静态类(因为始终有一个线程运行以监视硬件缓冲区),或者是您按需调用并订阅的东西。您需要根据需要进行修改。
public class BaseFrameMonitor
{
    // You want to make this access thread safe
    public event EventHandler<BaseFrameEventArgs> HardwareEvent;

    public BaseFrameMonitor()
    {
        // Create/subscribe to your thread that
        // drains hardware signals.
    }
}

现在您有一个公开事件的类。可观察对象与事件配合得很好。如果您遵循标准事件模式,通过Observable类上的静态FromEventPattern方法将事件流(将事件流视为多次触发事件)转换为IObservable<T>实现,就可以轻松完成。有了事件源和FromEventPattern方法,我们可以轻松创建IObservable<EventPattern<BaseFrameEventArgs>>EventPattern<TEventArgs>体现了.NET事件中所见到的内容,特别是从EventArgs派生的实例和表示发送方的对象),如下所示:
// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();

// Create the observable.  It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
    FromEventPattern<BaseFrameEventArgs>(
        h => source.HardwareEvent += h,
        h => source.HardwareEvent -= h);

当然,您希望有一个 IObservable<IBaseFrame>,但这很容易,使用 Observable 类上的 Select 扩展方法 创建一个投影(就像在 LINQ 中一样),我们可以将所有这些包装在一个易于使用的方法中:
public IObservable<IBaseFrame> CreateHardwareObservable()
{
    // The event source.
    // Or you might not need this if your class is static and exposes
    // the event as a static event.
    var source = new BaseFrameMonitor();

    // Create the observable.  It's going to be hot
    // as the events are hot.
    IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
        FromEventPattern<BaseFrameEventArgs>(
            h => source.HardwareEvent += h,
            h => source.HardwareEvent -= h);

    // Return the observable, but projected.
    return observable.Select(i => i.EventArgs.BaseFrame);
}

7
感谢你的回复@casperOne,这是我的初始思路,但为了用Rx包装它而添加一个事件感觉有些“不对”。目前我使用委托(是的,我知道那正是事件的本质!)以配合用于加载和保存配置的代码,这必须能够重建组件管道,委托系统为我提供了最大的灵活性。现在在这个领域里,Rx让我感到头疼,但框架中其他方面的强大功能使解决配置问题变得非常值得。 - Anthony
@Anthony 如果你能让他的代码示例正常工作,那太好了,但正如我所评论的,它毫无意义。至于感觉“不对”,我不知道为什么将事物细分为逻辑部分似乎“不对”,但是在你的原始帖子中没有足够的细节说明如何最好地将其转换为IObservable<T>,因为没有提供有关如何使用该信息进行信号传递的任何信息。 - casperOne
@casperOne 在您看来,使用Subjects是否适合于消息总线/事件聚合器? - kitsune
1
@kitsune 不,我不明白为什么会这样。如果你在考虑“优化”,那么你必须问自己一个问题,即是否Rx是问题的原因?你已经测量过了吗? - casperOne
3
我同意casperOne的观点,拆分关注点是个好主意。但我想指出,如果你采用硬件到事件到Rx的模式,你会失去错误语义。任何丢失的连接或会话等都不会暴露给消费者。现在,消费者无法决定是否要重试、断开连接、订阅另一个序列或其他操作。 - Lee Campbell

0

一般化地认为主题不适合用于公共接口是不好的。虽然这显然不是反应式编程方法的正确方式,但对于经典代码来说,这绝对是一个很好的改进/重构选项。

如果您有一个带有公共设置访问器的普通属性,并且您想要通知更改,那么使用BehaviorSubject替换它是没有问题的。INPC或其他事件并不那么干净,而且个人感觉很疲劳。因此,您可以使用BehaviorSubjects作为公共属性,而不是普通属性,并放弃INPC或其他事件。

此外,Subject接口使您的接口用户更加了解您的属性功能,并更有可能订阅,而不仅仅是获取值。

如果您希望其他人监听/订阅属性更改,则最好使用它。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接