我现在处于一个情况中,我有一个任务列表正在处理(启用驱动器、改变位置、等待停止、禁用)。
'wait for' 监视一个 IObservable,我想要等待它(这样我就可以通过 ContinueWith 和其他任务进行线程处理)。
我最初在订阅者的 OnNext 处理中使用了以下任务,但那只是丑陋的。现在我想到了这个扩展方法:
'wait for' 监视一个 IObservable,我想要等待它(这样我就可以通过 ContinueWith 和其他任务进行线程处理)。
我最初在订阅者的 OnNext 处理中使用了以下任务,但那只是丑陋的。现在我想到了这个扩展方法:
public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
var tcs = new TaskCompletionSource<T>();
source
.Where(pred)
.DistinctUntilChanged()
.Take(1) //OnCompletes the observable, subscription will self-dispose
.Subscribe(val => tcs.TrySetResult(val),
ex => tcs.TrySetException(ex),
() => tcs.TrySetCanceled());
return tcs.Task;
}
(更新,根据svick的建议处理了OnCompleted
和OnError
)
问题:
- 这是好还是坏?
- 我是否忽略了已存在的扩展名,可以完成此操作?
Where
和DistinctUntilChanged
的顺序正确吗?(我认为是)
source
出错或完成的情况吗? - svick