7得票6回答
如何正确创建一个Observable以读取流并等待其结束

我在这里遇到了困难。通常我会读一本书,但现在还没有找到合适的。我已经找到了许多关于使用RX读取流的各种示例,但是我很难理解。 我知道可以使用Observable.FromAsyncPattern创建Stream的BeginRead/EndRead或BeginReadLine/EndReadL...

7得票1回答
如何在 Rx.NET 中正确地限制并发性。

请注意以下代码片段: 请观察以下代码片段: var result = await GetSource(1000).SelectMany(s => getResultAsync(s).ToObservable()).ToList(); 这段代码的问题在于 getResultAsyn...

8得票1回答
为什么两个进程比两个线程更有优势?

我有一个基于MSMQ的位置应用程序,我从现场单元接收位置更新,并将其处理并放入数据库中。 更新过程不依赖于DB外部,因此我的应用程序可以配置为使用可变数量的线程。由于我希望在故障下该进程具有鲁棒性,因此我希望尽可能多地处理消息,但不要超过限制(因此如果系统失败,我可以从上次离开的地方继续)。...

35得票9回答
使用Rx,当我的Subscribe方法正在运行时,如何忽略除最新值以外的所有值

使用Reactive Extensions,我想忽略在我的Subscribe方法运行时发生的事件流中的消息。也就是说,有时候我处理消息的时间比消息之间的时间更长,因此我想要丢弃我没有时间处理的消息。 然而,当我的Subscribe方法完成时,如果有任何消息通过,我希望处理最后一个消息。所以我...

32得票5回答
响应式扩展(Rx)+ MVVM =?

解释Reactive Extensions(Rx)强大之处的主要示例之一是将现有的鼠标事件组合成一个新的“事件”,表示鼠标拖动期间的差异:var mouseMoves = from mm in mainCanvas.GetMouseMove() let loc...

8得票4回答
使用Rx,有什么好的方法来运行定期任务,并限制单个并发执行?

我希望能够定期运行任务,但限制同一时间只能运行一个方法的执行。 我尝试使用Rx进行实验,但不确定如何强制实施最多一次并发限制。 var timer = Observable.Interval(TimeSpan.FromMilliseconds(100)); timer.Subscribe(...

10得票3回答
RxJS 和 WebWorkers

快速问题 如果我有一个WebWorker,它有一个返回Observable<Any>到UI代码的函数,如果我订阅这个Observable,那么这个Observable运行在UI线程还是WebWorker线程上? 我问这个问题是因为我正在编写使用RxJS的Angular2应用程序...

7得票1回答
Rx和异步NUnit测试

我正在尝试为项目创建异步单元测试,但是无法理解如何等待异步主题完成: [Test] public async void MicroTest() { var value = 2; var first = new AsyncSubject...

11得票1回答
如何从Web API获取IObservable

我有一个简单的Web API,返回一个Iobservable。我正在使用HttpClient获取Observable,以便可以订阅它。我的问题是,在订阅时返回的Iobservable会发送一个“空”结果。 服务器public IObservable<DataItem> GetDa...

28得票3回答
我可以帮你翻译成中文:如何清空ReplaySubject的缓存?

如何清除ReplaySubject的缓冲区? 我需要定期清除缓冲区(在我的情况下为每日结束事件),以防止ReplaySubject不断增长并最终占用所有内存。 理想情况下,我希望保留相同的ReplaySubject,因为客户端订阅仍然有效。