等待可观察对象的结果

28

所以在C# 4.0的不幸日子里,我创建了以下"WorkflowExecutor"类,通过对IEnumerable的"yield return"继续进行黑客攻击,允许在GUI线程中执行异步工作流,等待可观察对象。因此,以下代码将在button1Click时启动一个简单的工作流程,更新文本,等待您单击button2,然后在1秒后循环。

public sealed partial class Form1 : Form {
    readonly Subject<Unit> _button2Subject = new Subject<Unit>();
    readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor();

    public Form1() {
        InitializeComponent();
    }

    IEnumerable<IObservable<Unit>> CreateAsyncHandler() {
        Text = "Initializing";
        var scheduler = new ControlScheduler(this);
        while (true) {
            yield return scheduler.WaitTimer(1000);
            Text = "Waiting for Click";
            yield return _button2Subject;
            Text = "Click Detected!";
            yield return scheduler.WaitTimer(1000);
            Text = "Restarting";
        }
    }

    void button1_Click(object sender, EventArgs e) {
        _workflowExecutor.Run(CreateAsyncHandler());
    }

    void button2_Click(object sender, EventArgs e) {
        _button2Subject.OnNext(Unit.Default);
    }

    void button3_Click(object sender, EventArgs e) {
        _workflowExecutor.Stop();
    }
}

public static class TimerHelper {
    public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) {
        return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default);
    }
}

public sealed class WorkflowExecutor {
    IEnumerator<IObservable<Unit>> _observables;
    IDisposable _subscription;

    public void Run(IEnumerable<IObservable<Unit>> actions) {
        _observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator();
        Continue();
    }

    void Continue() {
        if (_subscription != null) {
            _subscription.Dispose();
        }
        if (_observables.MoveNext()) {
            _subscription = _observables.Current.Subscribe(_ => Continue());
        }
    }

    public void Stop() {
        Run(null);
    }
}

这个想法中的巧妙部分,使用“yield”连续来完成异步工作,是从Daniel Earwicker的AsyncIOPipe想法中借鉴而来:http://smellegantcode.wordpress.com/2008/12/05/asynchronous-sockets-with-yield-return-of-lambdas/,然后我在其之上添加了响应式框架。

现在,我尝试使用C# 5.0中的async特性进行重写,但似乎应该很容易做到。当我将observables转换为tasks时,它们只运行一次,并且while循环第二次会崩溃。任何帮助修复此问题的建议都将不胜感激。

除此以外,async/await机制能给我什么,WorkflowExecutor却不能?是否有任何我可以使用async/await实现而非WorkflowExecutor(代码量相似)的功能?


你是如何将其转换为“Task”的?这个界面为什么会崩溃? - svick
1
await相对于这种异步方式有许多优势,但其中一个重要的区别是从可等待对象返回。例如:string s = await client.DownloadStringAsync(url); - svick
2个回答

39

正如James所提到的,你可以从Rx v2.0 Beta开始等待一个IObservable<T>序列。 它的行为是返回最后一个元素(在OnCompleted之前),或抛出观察到的OnError。 如果序列不包含任何元素,则会抛出InvalidOperationException。

请注意,使用此方法,您可以获得所有其他期望的行为:

  • 通过等待xs.FirstAsync()来获取第一个元素
  • 通过等待xs.SingleAsync()确保仅有单个值
  • 当您可以接受空序列时,请等待xs.DefaultIfEmpty()
  • 要获取所有元素,请等待xs.ToArray()或await xs.ToList()

您甚至可以执行更多高级操作,例如使用Do和Scan计算聚合结果并观察中间值:

var xs = Observable.Range(0, 10, Scheduler.Default);

var res = xs.Scan((x, y) => x + y)
            .Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); });

Console.WriteLine("Done! The sum is {0}", await res);

3
感谢分享。最近在一个项目中看到等待一个 IObservable<T> 也能正常构建,这正是我正在寻找的信息。 - jpierson
1
在这里想提一下,由于谷歌搜索会导向这个答案,新的扩展ToTask也是这样的行为。它将具体执行_tcs.TrySetException(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));。这并不明显,我在追踪我的.First*调用数小时后才意识到可观察对象本身在我预期的情况下抛出了异常。 - bokibeg

33

正如您所注意到的,Task非常适用于一次性使用,而Observable则适用于“事件流”。在Rx团队关于2.0 Beta的博客文章中有一个二维表格,可以很好地思考这个问题(在我看来):

2x2 chart for task vs observable

根据情况的不同(一次性使用还是“事件流”),保留Observable可能更有意义。

如果您可以升级到Reactive 2.0 Beta版本,则可以使用“await”来等待Observables。例如,我自己尝试编写一个类似于您代码的“async/await”(近似)版本:

public sealed partial class Form1 : Form
{
    readonly Subject<Unit> _button2Subject = new Subject<Unit>();

    private bool shouldRun = false;

    public Form1()
    {
        InitializeComponent();
    }

    async Task CreateAsyncHandler()
    {
        Text = "Initializing";
        while (shouldRun)
        {
            await Task.Delay(1000);
            Text = "Waiting for Click";
            await _button2Subject.FirstAsync();
            Text = "Click Detected!";
            await Task.Delay(1000);
            Text = "Restarting";
        }
    }

    async void button1_Click(object sender, EventArgs e)
    {
        shouldRun = true;
        await CreateAsyncHandler();
    }

    void button2_Click(object sender, EventArgs e)
    {
        _button2Subject.OnNext(Unit.Default);
    }

    void button3_Click(object sender, EventArgs e)
    {
        shouldRun = false;
    }
}

“Task” 是一次性使用的,但您可以等待不是 “Task” 的东西。因此,应该可以创建一个可等待对象,可以表示整个 IObservable<T>,而不仅仅是一个项目。 - svick
这就是我在代码示例中所做的。使用 Rx 2.0,您可以等待 observables。默认行为是返回 observable 的最后一个元素,这就是为什么它执行 FirstAsync。 - James Manning

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接