为什么可以等待Rx Observable?

7

我刚刚注意到await关键字可以与Rx Observable一起使用,例如:

await Observable.Interval(TimeSpan.FromHours(1));

我很确定它只能与任务一起使用。

那么是什么让它成为可能呢?观察者模式的知识是硬编码到编译器中的吗?

2个回答

9
不,编译器没有关于“IObservable”的特殊知识。根据C# 5规范第7.7.7.1节的规定,如果对象具有一个名为“GetAwaiter”的方法或者作用域中存在一个返回实现了“System.Runtime.CompilerServices.INotifyCompletion”的类型的扩展方法,则可以使用await。请参阅Steven Toub的文章“等待任何内容”。更具体地说,来自规范的说明如下。
一个await表达式的任务需要是可等待的。如果以下情况之一成立,则表达式是可等待的:
- t 是编译时动态类型
- t 具有名为GetAwaiter的可访问实例或扩展方法,没有参数和类型参数,并且返回类型A,其中所有以下条件均成立:
1. A 实现接口 System.Runtime.CompilerServices.INotifyCompletion(以下简称INotifyCompletion)
2. A 具有可访问的、可读的实例属性 IsCompleted,类型为 bool
3. A 具有无参数和类型参数的可访问实例方法 GetResult

请注意,这类似于foreach不需要IEnumerable而只需要返回兼容对象的GetEnumerator方法。这种鸭子类型是一种性能优化,允许值类型在不进行装箱的情况下被编译器使用。这可以用于避免在性能敏感代码中进行不必要的分配。

值得注意的是,当调用 OnCompleted 时,等待可观察对象仅返回最后一个值。如果序列中没有任何值,则会抛出异常。 - Dave Sexton

9

我认为这是因为System.Reactive.LinqIObservable上定义了一个GetAwaiter扩展方法。正如@mike z所解释的那样,这允许您等待IObservable。以下是该方法:

public static AsyncSubject<TSource> GetAwaiter<TSource>(this IObservable<TSource> source)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    return s_impl.GetAwaiter<TSource>(source);
}

返回类型 AsyncSubject<T> 实现了 INotifyCompletion 接口,并且具有属性 IsCompleted 和方法 GetResult

public sealed class AsyncSubject<T> : ISubject<T>, ISubject<T, T>, IObserver<T>, IObservable<T>, IDisposable, INotifyCompletion
{
    // Fields
    private Exception _exception;
    private readonly object _gate;
    private bool _hasValue;
    private bool _isDisposed;
    private bool _isStopped;
    private ImmutableList<IObserver<T>> _observers;
    private T _value;

    // Methods
    public AsyncSubject();
    private void CheckDisposed();
    public void Dispose();
    public AsyncSubject<T> GetAwaiter();
    public T GetResult();
    public void OnCompleted();
    public void OnCompleted(Action continuation);
    private void OnCompleted(Action continuation, bool originalContext);
    public void OnError(Exception error);
    public void OnNext(T value);
    public IDisposable Subscribe(IObserver<T> observer);

    // Properties
    public bool HasObservers { get; }
    public bool IsCompleted { get; }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接