限制IObservable的速率会导致观察者不被调用。

3

我有这样一个简单的代码,它:

  • 创建 IObservable
  • 对其进行 0.5 秒采样
  • 使用 ThreadPool 调度程序对其进行订阅
  • 使用 SynchronizationContext 进行观察

以下是代码:

private void DisplayPoints()
{
    var x = 0;
    var ob = this.GeneratePoints();
    ob
      .Sample(TimeSpan.FromMilliseconds(500))
      .SubscribeOn(ThreadPoolScheduler.Instance)
      .ObserveOn(SynchronizationContext.Current)
      .Subscribe(d => Console.WriteLine(d));
}

private IObservable<double> GeneratePoints()
{
    return Observable.Create<double>(o => this.GeneratePoints(o));
}

private IDisposable GeneratePoints(IObserver<double> observer)
{
    var i = 0;
    while (true)
    {
        var value = random.Next(0, 100) * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1))));

        observer.OnNext(value);

        i++;
    }

    return Disposable.Empty;
}

然而,控制台从未输出任何内容(即匿名观察者从未被调用)。如果我删除Sample操作符,观察者会被调用,尽管这显然不是想要的行为(UI线程将会遭受攻击)。
我显然在这里缺少了什么。我的意图是生成数据,通过IObserver推送数据,并通过UI显示其中一部分。
编辑:由于有些人误解了我的意图(尽管它们明确陈述在上面),我应该重申一下我要做的事情:
- 使用算法生成一些数据(对于我的问题来说,double值似乎已经足够) - 在GUI中显示数据
使用IObservable和响应式扩展似乎是解决我的问题的好办法。
再次强调:我在真实代码中不会返回随机数 - 这只是一个占位符,以使我的预期行为正常工作。
3个回答

1
我猜测您的问题与 Throttle 引入了 DefaultScheduler.Instance 内部的并发有关,以及您对 IDisposable GeneratePoints(IObserver<double> observer) 的实现是非标准的。
请尝试像这样重新实现 IObservable<double> GeneratePoints():
private IObservable<double> GeneratePoints()
{
    return Observable.Generate<int, double>(
        0,
        i => true,
        i => i + 1,
        i => random.Next(0, 100) * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1)))));
}

那可能有所帮助。

问题出在你的可观察对象在订阅过程中直接推出值。创建可观察对象时,应始终尝试使用标准内置运算符。上面的代码使用了内置的 Generate 运算符,因此它应该更好地适用于你的代码。


Observable.Generate 似乎是创建 IObservable 更好的方式。我不喜欢这段代码的 "意大利面条式" 结构(虽然添加新行确实有所帮助)。在真正的代码中,我不会返回随机数 - 这只是一个占位符,以使我的预期行为得以实现。 - mnn

1

你可能不想在紧密循环中生成随机数。最好使用时间间隔。下面的代码每200毫秒生成一组随机数。

IObservable<double> observable =
     Observable.Interval(TimeSpan.FromMillSeconds(200))
          .Select((t,i) => random.Next(0, 100) 
                      * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1)))))

Enigmativity 给你写的代码也是紧凑循环。他关于您的错误在订阅过程中推出值的观点也是正确的。要使其正常工作,您需要进行最小更改。
    private static Task GeneratePoints(IObserver<double> observer, CancellationToken token)
    {
        return Task.Run(() =>
        {
            var i = 0;
            var random = new Random();
            while ( true )
            {
                token.ThrowIfCancellationRequested();

                var value = random.Next(0, 100) * ( 1 / ( double ) random.Next(1, Math.Min(50, Math.Max(i, 1))) );

                observer.OnNext(value);

                i++;
            }
        });
    }

稍后一些时间

    Observable.Create<double>((observer, token) => GeneratePoints(observer, token));

请注意传递的取消令牌。当序列的订阅者取消订阅时,此令牌将被设置并且循环将终止。
但这是很多工作,Enigmativities 的答案更简单,并为您抽象了上面的代码。对于更复杂的情况,手动执行此操作仍然非常有用。

为什么要在值之间引入延迟? - Enigmativity
OP 问了关于节流的问题,但在没有时间间隔的事件范围内,这似乎并不合理。我想他可能正在尝试在事件之间设置延迟。如果您确实想要转储事件,则您的代码完全正确。在我自己的答案之前,我已经为您投票了 ;) - bradgonesurfing
看起来 OP 确实在引入延迟,但是将延迟添加到源 observable 中可能会引入意外的副作用。这里可能不是什么大问题,但通常很难调试这种情况。 - Enigmativity
1
当然。您需要确切地了解使用情况,除了知道OP实际上并不理解基本的RX组合器之外,我们没有足够的上下文。 - bradgonesurfing
1
顺便说一下,Observable.Generate有一个重载函数可以通过Timespan或特定的DateTime来延迟每次迭代 - 它甚至可以使用状态,以便每次迭代都可以进行变化。此外,Observable.Generate在每次迭代时都会让出调度程序(每个后续迭代都会在前面完成后进行调度),因此紧密循环不会导致调度程序饥饿。 - James World
显示剩余2条评论

0

Throttle 只有在至少间隔了 500 毫秒(在您的情况下)后才会让一个值通过。由于 GeneratePoints 推送的值比这个速度快得多,因此不会发生任何事情。在这种情况下,Sample 可能是您想要的运算符,它将每 500ms 产生一个值。

Source:       1111111111111111111----------111---111111
Throttle (5): -----------------------1-----------------
Sample (5):   ----1----1----1----1---1------------1----1

不幸的是,“Sample”并没有帮助——我得到了相同的结果(观察者没有被调用),除非将采样间隔降低到1毫秒以下,否则根本不会调用观察者。 - mnn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接