您编写的代码几乎可以并行运行可观察对象。如果您将观察者编写为以下方式:
public class Subscriber : IObserver<int>
{
public void OnNext(int a)
{
Console.WriteLine("{0} on {1} at {2}",
a,
Thread.CurrentThread.ManagedThreadId,
DateTime.Now.ToString());
}
public void OnError(Exception e)
{ }
public void OnCompleted()
{ }
}
接着运行以下代码:
var observable =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => (int)x)
.Take(5)
.ObserveOn(Scheduler.ThreadPool);
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
Thread.Sleep(10000);
将会产生以下结果:
0 on 28 at 2011/10/20 00:13:49
0 on 16 at 2011/10/20 00:13:49
1 on 29 at 2011/10/20 00:13:50
1 on 22 at 2011/10/20 00:13:50
2 on 27 at 2011/10/20 00:13:51
2 on 29 at 2011/10/20 00:13:51
3 on 27 at 2011/10/20 00:13:52
3 on 19 at 2011/10/20 00:13:52
4 on 27 at 2011/10/20 00:13:53
4 on 27 at 2011/10/20 00:13:53
它已经在不同的线程上并行运行订阅。
我使用的重要方法是.ObserveOn
扩展方法 - 它使这个工作成为可能。
你应该记住,观察者通常不共享相同的可观测实例。订阅可观测对象就像有效地将从可观测对象源到观察者的唯一“链”连接起来。这与对可枚举项两次调用GetEnumerator
类似,你不会共享相同的枚举器实例,而会得到两个唯一的实例。
现在,我想描述一下我所说的链是什么意思。我将给出Reflector.NET提取的代码Observable.Generate
和Observable.Where
来说明这一点。
以这段代码为例:
var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x);
var ys = xs.Where(x => x
ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ });
在底层,
Generate
和
Where
都会创建内部Rx类
AnonymousObservable<T>
的新实例。对于
AnonymousObservable<T>
的构造函数需要一个
Func<IObserver<T>, IDisposable>
委托,每当它接收到对
Subscribe
的调用时就会使用这个委托。下面是从Reflector.NET上稍微整理过的
Observable.Generate<T>(...)
代码:
public static IObservable<TResult> Generate<TState, TResult>(
TState initialState,
Func<TState, bool> condition,
Func<TState, TState> iterate,
Func<TState, TResult> resultSelector,
IScheduler scheduler)
else
flag = condition(state);
if (flag)
}
catch (Exception exception)
if (flag)
else
});
});
}
Action self
参数是一个递归调用,以迭代输出值。您会注意到,在此代码中,没有存储 observer
或将值粘贴到多个观察者中。此代码对于每个新的观察者运行一次。
从 Reflector.NET 清理稍微的代码 Observable.Where<T>(...)
如下:
public static IObservable<TSource> Where<TSource>(
this IObservable<TSource> source,
Func<TSource, bool> predicate)
{
return new AnonymousObservable<TSource>(observer =>
source.Subscribe(x =>
{
bool flag;
try
{
flag = predicate(x);
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
if (flag)
{
observer.OnNext(x);
}
}, ex => observer.OnError(ex), () => observer.OnCompleted));
}
再次说明,这段代码不能跟踪多个观察者。它有效地调用了Subscribe
方法,将自己的代码作为观察者传递给基础的source
可观察对象。
在上面的例子中,您应该看到,订阅Where
会创建一个对Generate
的订阅,因此这是一系列可观察对象的链接。实际上,它是在一系列AnonymousObservable
对象上链接订阅调用。
如果您有两个订阅,那么您就有两条链。如果您有1,000个订阅,那么您就有1,000条链。
现在,只是作为一个侧面的注意事项 - 即使有IObservable<T>
和IObserver<T>
接口 - 您非常非常少地需要在自己的类中实际实现这些接口。内置的类和运算符处理了99.99%的情况。这有点像IEnumerable<T>
- 你有多少次需要自己实现这个接口呢?
如果需要进一步解释,请告诉我是否有帮助。