如何将实体推入 Rx Observable?

7
我有一个类负责在频繁但不规则的时间间隔内生成事件,其他类必须消耗并操作这些事件。我想使用Reactive Extensions来完成这个任务。
消费者方面非常简单;我的消费者类实现了IObserver<Payload>,一切看起来都很好。问题出现在生产者类上。
直接实现IObservable<Payload>(也就是说,为IDisposable Subscribe(IObserver<Payload> )提供自己的实现)根据文档不推荐。它建议使用Observable.Create()函数系列进行组合。由于我的类将运行很长时间,我尝试使用var myObservable = Observable.Never()创建一个Observable,然后在有新的Payloads可用时调用myObservable.Publish(payloadData)。然而,当我这样做时,似乎没有触发我的消费者中的OnNext实现。

我认为,作为一种解决方法,我可以在我的类中创建一个事件,然后使用FromEvent函数创建Observable,但这似乎是一种过于复杂的方法(即,新的Observables“需要”事件工作似乎很奇怪)。这里有没有我忽视的简单方法?创建自己的Observable源的“标准”方式是什么?


1
一般来说,如果你发现自己正在实现 IObservable<T> 或者 IObserver<T> 中的任意一个,那么很可能你做错了什么。 - Enigmativity
3个回答

9
创建一个新主题<Payload>并调用它的OnNext方法来发送事件。您可以使用观察者订阅该主题。
主题的使用经常受到争议。有关主题使用的深入讨论(链接到入门指南),请参见此处 - 但总的来说,这个用例听起来是有效的(即它可能符合本地+热的标准)。
顺便说一下,Subscribe重载接受委托的情况下,可能无需提供IObserver<T>的实现。

Subject<T>的概念和提供的链接确实帮助我更全面地理解了这个问题。 - GWLlosa

1
除非我找到更好的方法,否则目前我决定使用BlockingCollection
var _itemsToSend = new BlockingCollection<Payload>();
IObservable<MessageQueuePayload> _deliverer =
_itemsToSend.GetConsumingEnumerable().ToObservable(Scheduler.Default);

1

Observable.Never() 不会“发送”通知,你应该使用 Observable.Return(yourValue)

如果你需要具体示例的指南,我建议阅读 Intro to Rx


1
我可能在这里非常困惑......如果我使用Observable.Return,那么该可观察对象只有一个值?我需要一个可以保持活动状态一段时间的可观察对象,以便我可以定期将更多的计算值推送到它上面。 - GWLlosa
你应该真正阅读“Rx 入门”,IObservable<T> 是一系列事件,当观察者收到 OnNext(T value) 时,你可以使用最新的值。 - kondas
2
根据文档,“一个最后的必要工厂方法或原始构造函数被称为 Return。它的作用是表示单个元素序列,就像 IEnumerable 序列世界中的单一单元数组一样。订阅观察者所观察到的行为是两条消息:带着值的 OnNext 和表示序列结束已经被接收的 OnCompleted: IObservable<int> source = Observable.Return(42); 运行这段代码将产生以下输出: OnNext: 42 OnCompleted”。我不想要完成信号,我希望序列保持打开/活动状态。 - GWLlosa
你的源是什么?我写了“事件序列”,因为它是最简单的概念。例如,函数FromEvent钩住 .Net 事件,并且每次系统触发“事件”时,它都会向你的观察者发送 OnNext()。 - kondas

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接