'WaitFor'一个可观察对象

11
我现在处于一个情况中,我有一个任务列表正在处理(启用驱动器、改变位置、等待停止、禁用)。
'wait for' 监视一个 IObservable,我想要等待它(这样我就可以通过 ContinueWith 和其他任务进行线程处理)。
我最初在订阅者的 OnNext 处理中使用了以下任务,但那只是丑陋的。现在我想到了这个扩展方法:
public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
    var tcs = new TaskCompletionSource<T>();
    source
        .Where(pred)
        .DistinctUntilChanged()
        .Take(1)  //OnCompletes the observable, subscription will self-dispose
        .Subscribe(val => tcs.TrySetResult(val),
                    ex => tcs.TrySetException(ex),
                    () => tcs.TrySetCanceled());

    return tcs.Task;
}

(更新,根据svick的建议处理了OnCompletedOnError)

问题:

  • 这是好还是坏?
  • 我是否忽略了已存在的扩展名,可以完成此操作?
  • WhereDistinctUntilChanged的顺序正确吗?(我认为是)

你难道不应该处理 source 出错或完成的情况吗? - svick
@svick,嗯,好问题。在我的特定用例中,我正在观察“重复”,所以我认为那不会完成。但是如果我想要这个扩展可重用,我应该处理那些情况。 - Benjol
2个回答

13

至少我会将这个扩展方法改为这样:

public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
    return
        source
            .Where(pred)
            .DistinctUntilChanged()
            .Take(1)
            .ToTask();
}

使用.ToTask()比引入TaskCompletionSource要好得多。您需要引用System.Reactive.Threading.Tasks命名空间才能获取.ToTask()扩展方法。
此外,在此代码中,DistinctUntilChanged是多余的。您只会获得一个值,因此默认情况下必须是不同的。
现在,我的下一个建议可能有点有争议。这个扩展是一个坏主意,因为它隐藏了正在进行的真正语义。
如果我有这两个代码片段:
var t = xs.WaitFor(x => x > 10);

或者:

var t = xs.Where(x => x > 10).Take(1).ToTask();

我通常更喜欢第二个片段,因为它清楚地向我展示了正在发生的事情 - 我不需要记住WaitFor的语义。

除非您将WaitFor的名称更加描述性 - 例如TakeOneAsTaskWhere - 否则您会使使用运算符的代码变得不清晰,并使代码更难管理。

以下内容难道不会更容易记住语义吗?

var t = xs.TakeOneAsTaskWhere(x => x > 10);

我的结论是Rx操作符应该被组合使用,而不是被封装起来,但如果你要封装它们,那么它们的含义必须是清晰的。
希望这可以帮助你。

谢谢。我在网上各处看到过 ToTask,但是(懒惰地)认为它已经丢失在某个过去或之前的版本中了。我认为你说得对,使用 ToTask 代码变得不那么丑陋,因此扩展方法的需求也减少了。 - Benjol
我认为封装在应用一组组成部分并且这些部分放在一起时具有更高意义的情况下是不错的。但是在这里,我认为没有足够的“更高意义”值得封装 - 封装的名称必须比组合的集合更有意义。如果您使用xs.FirstAsync(...).ToTask()替换xs.Where(...).Take(1).ToTask(),那么这就非常简洁明了并且目的清晰。 - Niall Connaughton

3
不确定,但从阅读 Rx 2.0 beta 博客文章来看,如果你可以使用 async/await,那么你可以“return await source.FirstAsync(pred)”或者不使用async,“return source.FirstAsync(pred).ToTask()”。详情请参见此链接。下图是使用 Rx 2.0 和 FirstAsync 的 Linqpad 截图:sshot of linqpad using rx 2.0 and firstasync

如果我升级的话,我会记住这个的。目前我正在尝试在没有async/await的情况下看看它有多难。 - Benjol

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接