Rx Throttle(...).ObserveOn(scheduler)和Throttle(..., scheduler)之间的区别

6

I have the following code:

IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50), RxApp.MainThreadScheduler)
                                       .Subscribe(_ => UpdateUi());

正如预期的那样,UpdateUi()总是在主线程上执行。当我将代码更改为

IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50))
                                       .ObserveOn(RxApp.MainThreadScheduler)
                                       .Subscribe(_ => UpdateUi());

UpdateUI() 将在后台线程中执行。

为什么 Throttle(...).ObserveOn(scheduler) 不等同于 Throttle(..., scheduler)

3个回答

5
在你提供的代码示例中,无论哪种情况,UpdateUi 都将始终在由 RxApp.MainThreadScheduler 指定的调度程序上调用。我可以肯定地说这一点,因为 ObserveOn 是一个装饰器,它确保订阅者的 OnNext 处理程序在指定的调度程序上被调用。有关详细分析,请参见此处
所以说,这有点令人困惑。要么RxApp.MainThreadScheduler未引用正确的调度程序调度程序,要么UpdateUi正在离开调度程序线程。前者并不罕见-请参见此处,其他人也遇到了这个问题。我不知道那种情况的问题是什么。也许 @PaulBetts 可以发表意见,或者您可以在https://github.com/reactiveui/上提出问题。无论如何,我会仔细检查您在这里的假设,因为我希望这是一个经过充分测试的领域。您是否有完整的复制代码?
至于您的具体问题,Throttle(...).ObserveOn(scheduler)Throttle(..., scheduler) 之间的区别如下:
在第一种情况下,当未指定调度程序时,Throttle 将使用默认平台调度程序来引入运行其计时器所需的并发性,在 WPF 上,这将使用线程池线程。因此,所有限制将在后台线程上完成,并且由于以下 ObserveOn,只有释放的事件才会传递到订阅者所指定的调度程序。
Throttle 指定调度程序的情况下,限流是在该调度程序上完成的-被抑制的事件和已释放的事件都将在该调度程序上进行管理,并且订阅者也将在同一调度程序上被调用。
因此,无论哪种方式,UpdateUi 都将在 RxApp.MainThreadScheduler 上调用。
在大多数情况下,您最好在调度程序上限制 UI 事件,因为在后台线程上运行单独的计时器并支付上下文转换的成本,如果只有一小部分事件会通过限制,则通常成本更高。

为了确认你没有遇到 RxApp.MainThreadScheduler 的问题,建议你尝试通过其他方式显式地指定调度器或 SynchronizationContext。具体操作方法取决于你所在的平台 - 如果可以使用 ObserveOnDispatcher(),那最好不过;否则可以使用适当的 ObserveOn 重载方法。只要正确导入了相应的 Rx 库,就有关于控件、同步上下文和调度器的选项可供使用。


1
谢谢,结果似乎我的MainThreadScheduler出了问题(请参见https://dev59.com/iojca4cB1Zd3GeqPvlww#28827962)。 - larsmoa
我刚刚发现,这两者之间的区别还有更多内容 - noseratio - open to work

5

经过一番调查,我认为这是由于运行时使用了与我预期不同的Rx版本引起的(我正在为第三方应用程序开发插件)。

我不确定原因,但似乎默认的RxApp.MainThreadScheduler无法正确初始化。默认实例是一个WaitForDispatcherSchedulersource)。该类中的所有函数都依赖于attemptToCreateScheduler

    IScheduler attemptToCreateScheduler()
    {
        if (_innerScheduler != null) return _innerScheduler;
        try {
            _innerScheduler = _schedulerFactory();
            return _innerScheduler;
        } catch (Exception) {
            // NB: Dispatcher's not ready yet. Keep using CurrentThread
            return CurrentThreadScheduler.Instance;
        }
    }

在我的情况下,似乎发生的是_schedulerFactory()抛出异常,导致返回CurrentThreadScheduler.Instance
通过手动初始化RxApp.MainThreadSchedulernew SynchronizationContextScheduler(SynchronizationContext.Current),可以得到预期的行为。

1

我刚遇到了一个问题,首先引导我来到这个问题,然后进行一些实验。

事实证明,Throttle(timeSpan, scheduler) 足够聪明,可以在源发出另一个事件 Y 之前 "取消" 已经计划的去抖动事件 X。因此,只有最后一个去抖动事件 Y 最终会被观察到。

使用 Throttle(timeSpan).ObserveOn(scheduler),将同时观察到 XY

因此,在概念上,这是两种方法之间的重要区别。不幸的是,Rx.NET文档很少,但我相信这种行为是有意设计的,并且对我来说是有意义的。

为了用示例说明这一点(fiddle):

#nullable enable
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using static System.Console;

public class Program
{
    static async Task ThrottleWithScheduler()
    {
        WriteLine($"\n{nameof(ThrottleWithScheduler)}\n");

        var sc = new CustomSyncContext();
        var scheduler = new SynchronizationContextScheduler(sc);
        var subj = new BehaviorSubject<string>("A");

        subj
            .Do(v => WriteLine($"Emitted {v} on {sc.Elapsed}ms"))
            .Throttle(TimeSpan.FromMilliseconds(500), scheduler)
            .Subscribe(v => WriteLine($"Observed {v} on {sc.Elapsed}ms"));

        await Task.Delay(100);
        subj.OnNext("B");
        await Task.Delay(200);
        subj.OnNext("X");
        await Task.Delay(550);
        subj.OnNext("Y");

        await Task.Delay(2000);
        WriteLine("Finished!");
    }

    static async Task ThrottleWithObserveOn()
    {
        WriteLine($"\n{nameof(ThrottleWithObserveOn)}\n");

        var sc = new CustomSyncContext();
        var scheduler = new SynchronizationContextScheduler(sc);
        var subj = new BehaviorSubject<string>("A");

        subj
            .Do(v => WriteLine($"Emitted {v} on {sc.Elapsed}ms"))
            .Throttle(TimeSpan.FromMilliseconds(500))
            .ObserveOn(scheduler)
            .Subscribe(v => WriteLine($"Observed {v} on {sc.Elapsed}ms"));

        await Task.Delay(100);
        subj.OnNext("B");
        await Task.Delay(200);
        subj.OnNext("X");
        await Task.Delay(550);
        subj.OnNext("Y");

        await Task.Delay(2000);
        WriteLine("Finished!");
    }

    public static async Task Main()
    {
        await ThrottleWithScheduler();
        await ThrottleWithObserveOn();
    }
}

class CustomSyncContext : SynchronizationContext
{
    private readonly Stopwatch _sw = Stopwatch.StartNew();
    public long Elapsed { get { lock (_sw) { return _sw.ElapsedMilliseconds; } } }
    public override void Post(SendOrPostCallback d, object? state)
    {
        WriteLine($"Scheduled on {Elapsed}ms");
        Task.Delay(100).ContinueWith(
            continuationAction: _ =>
            {
                WriteLine($"Executed on {Elapsed}ms");
                d(state);
            },
            continuationOptions: TaskContinuationOptions.ExecuteSynchronously);
    }
}

输出:

ThrottleWithScheduler

Emitted A on 18ms
Emitted B on 142ms
Emitted X on 351ms
Scheduled on 861ms
Emitted Y on 907ms
Executed on 972ms
Scheduled on 1421ms
Executed on 1536ms
Observed Y on 1539ms
Finished!

ThrottleWithObserveOn

Emitted A on 4ms
Emitted B on 113ms
Emitted X on 315ms
Scheduled on 837ms
Emitted Y on 886ms
Executed on 951ms
Observed X on 953ms
Scheduled on 1391ms
Executed on 1508ms
Observed Y on 1508ms
Finished!

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接