为什么 Rx Observable.Subscribe 会阻塞我的线程?

4

你好,我尝试了101个Rx示例中的一个:

    static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
    {
        int i = 0;

        while (true)
        {
            if (i > 1000)
            {
                yield break;
            }
            yield return i;
            Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
        }
    }

    private static void Main()
    {
        var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
        var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));

        using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }

我不明白为什么“按任意键取消订阅”这行文字从未显示。我的理解是订阅是异步的,你订阅后它会立即返回。是什么导致我的主线程阻塞了?

2个回答

7
阻塞是由于你使用可枚举循环 while (true)IEnumerable<T>.ToObservable() 扩展方法默认使用 CurrentThreadScheduler 的组合导致的。
如果你将 Scheduler.TaskPool(或在 .NET 4 之前使用 Scheduler.ThreadPool)提供给 ToObservable 的重载方法,你应该会看到你期望的行为(尽管它不会在主线程上调用你的订阅者,FYI)。
话虽如此,我认为你会发现你的 Thread.SleepThrottle 的组合会按照你的预期工作。你最好创建一个使用调度程序来安排延迟的自定义可观察对象。

2

我同意Richard的看法。

.ToObservable() 的实现如下:

public static IObservable<TSource> ToObservable<TSource>(
    this IEnumerable<TSource> source)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    return source.ToObservable<TSource>(Scheduler.CurrentThread);
}

它使用Scheduler.CurrentThread调用.ToObservable(IScheduler)重载,因为你使用.Sleep(...)来引起延迟,所以在代码可以超出.Subscribe(...)方法之前,可观察对象必须完成。就像所有东西都运行在单个线程上一样,只需考虑这段代码的行为即可。

要解决这个问题,您可以像Richard建议的那样使用任务池或线程池调度程序,但我认为您的代码存在更根本性的问题。那就是您正在使用“老派”的线程睡眠而不是依赖于Rx方法。

请尝试使用以下代码生成您的可观察对象:

var observable =
    Observable
        .GenerateWithTime(0, i => i <= 1000, i => i + 1,
            i => i, i => TimeSpan.FromMilliseconds(i % 10 < 5 ? 500 : 1000))
        .Timestamp();

GenerateWithTime(...) 方法实现了你的 GenerateAlternatingFastAndSlowEvents 方法的所有功能,但是它直接创建可观察对象,并使用 Scheduler.ThreadPool 来执行,因此无需指定任何调度程序。


因此,使用.ToObservable创建一个在当前线程上运行的Observable,因此它是主线程运行Generate..方法。因此,Subscribe不会阻塞线程,因为它正在执行Observable。那么,如果我使用SubscribeOn并传递ThreadPoolScheduler,为什么会看到消息“按任意键取消订阅”?如果我的线程正在忙于执行Observable,那么在哪里订阅就无关紧要了。 - Eldar
@Eldar - 一旦您使用SubscribeOn(Scheduler.ThreadPool),订阅和观察者将在新线程上运行,因此Subscribe方法会立即返回,您将看到您的消息。只有当您的可观察对象使用CurrentThread调度程序进行调度时,您的Subscribe方法和观察者才会在当前线程上运行。 - Enigmativity

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接