另一种方法-与其他方法相比相当激进,因此我将提供一个新的答案:
这里是带有各种有趣的调试行的版本:
public static IObservable<T> FirstWithValues<T>(this IEnumerable<IObservable<T>> sources)
{
return Observable.Create<T>(obs =>
{
SerialDisposable disp = new SerialDisposable();
bool hadValues = false;
var sourceWalker = sources.GetEnumerator();
sourceWalker.MoveNext();
IObserver<T> checker = null;
checker = Observer.Create<T>(v =>
{
Console.WriteLine("Got value on source:" + v.ToString());
hadValues = true;
obs.OnNext(v);
},
ex => {
Console.WriteLine("Error on source, passing to observer");
obs.OnError(ex);
},
() => {
if(hadValues)
{
Console.WriteLine("Source completed, had values, so ending");
obs.OnCompleted();
}
else
{
Console.WriteLine("Source completed, no values, so moving to next source");
sourceWalker.MoveNext();
disp.Disposable = sourceWalker.Current.Subscribe(checker);
}
});
disp.Disposable = sourceWalker.Current.Subscribe(checker);
return disp.Disposable;
});
}
使用方法:
var query = new[]
{
Observable.Defer(() => GetSource("A")),
Observable.Defer(() => GetSource("B")),
Observable.Defer(() => GetSource("C")),
}.FirstWithValues()
输出:
Source A invoked
Got value on source:Article from A
Article from A
Source completed, had values, so ending
Source A invoked
Source completed, no values, so moving to next source
Source B invoked
Got value on source:Article from B
Article from B
Source completed, had values, so ending
Source A invoked
Source completed, no values, so moving to next source
Source B invoked
Source completed, no values, so moving to next source
Source C invoked
Got value on source:Article from C
Article from C
Source completed, had values, so ending
IObservable
? - JerKimballIObservable
。 - David PfefferIObservable
没有太多意义。查询意味着“我提出一个问题并期望立即得到答案”。IObservable
只接受订阅。它会在自己选择的时间回调给其订阅者。它是主动推送者,而不是您可以按照自己喜好拉取的被动对象。当与IObservable
交互时,您是观察者,是被动的一方! - Theodor Zoulias