实现AsyncCodeActivities(使用C# async / await)

6

很久以来,我一直使用以下模板编写自定义的AsyncCodeActivity类:

public sealed class MyActivity : AsyncCodeActivity<T>
{
    protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
    {
        var task = new Task<T>(this.Execute, state, CancellationToken.None, TaskCreationOptions.AttachedToParent);
        task.ContinueWith(s => callback(s));
        task.Start();
        return task;
    }

    protected override T EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
    {
        var task = result as Task<T>;
        if (task.Exception != null)
        {
            // Error handling. Rethrow? Cancel?
        }

        return task.Result;
    }

    private T Execute(object state)
    {
        // Logic here
        return default(T);
    }
}

我有一些关于IT技术的问题:

  1. 处理异常的正确方式是什么?重新抛出异常?将上下文设置为取消?
  2. 现在是否有一种优雅的方法可以使用async/await语法来编写它?

谢谢

1个回答

14

1)你应该从EndExecute方法中重新抛出异常。

2)我建议您创建自己的基础类型。我写了一个名为AsyncTaskCodeActivity<T>的基础类型如下:

public abstract class AsyncTaskCodeActivity<T> : AsyncCodeActivity<T>
{
    protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
    {
        var task = ExecuteAsync(context);
        var tcs = new TaskCompletionSource<T>(state);
        task.ContinueWith(t =>
        {
            if (t.IsFaulted)
                tcs.TrySetException(t.Exception.InnerExceptions);
            else if (t.IsCanceled)
                tcs.TrySetCanceled();
            else
                tcs.TrySetResult(t.Result);

            if (callback != null)
                callback(tcs.Task);
        });

        return tcs.Task;
    }

    protected sealed override T EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
    {
        var task = (Task<T>)result;
        try
        {
            return task.Result;
        }
        catch (AggregateException ex)
        {
            ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
            throw;
        }
    }

    protected abstract Task<T> ExecuteAsync(AsyncCodeActivityContext context);
}

如果您使用我的AsyncEx库,这个包装器会变得更加简单:
public abstract class AsyncTaskCodeActivity<T> : AsyncCodeActivity<T>
{
    protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
    {
        var task = ExecuteAsync(context);
        return AsyncFactory<T>.ToBegin(task, callback, state);
    }

    protected sealed override T EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
    {
        return AsyncFactory<T>.ToEnd(result);
    }

    protected abstract Task<T> ExecuteAsync(AsyncCodeActivityContext context);
}

一旦你有了基本类型,你可以定义自己的派生类型。这里有一个使用 async/await 的例子:

public sealed class MyActivity : AsyncTaskCodeActivity<int>
{
    protected override async Task<int> ExecuteAsync(AsyncCodeActivityContext context)
    {
        await Task.Delay(100);
        return 13;
    }
}

这里有一个将CPU绑定的工作计划到线程池的示例(类似于您当前使用的模板):

public sealed class MyCpuActivity : AsyncTaskCodeActivity<int>
{
    protected override Task<int> ExecuteAsync(AsyncCodeActivityContext context)
    {
        return Task.Run(() => 13);
    }
}

评论更新:这里有一个使用取消操作的示例。我不能百分之百确定它是否正确,因为取消本身是异步的,并且AsyncCodeActivity<T>.Cancel的语义未被充分记录(即,Cancel是否应该等待活动在已取消状态下完成?在调用Cancel后成功完成活动是否可接受?)。

public abstract class AsyncTaskCodeActivity<T> : AsyncCodeActivity<T>
{
    protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
    {
        var cts = new CancellationTokenSource();
        context.UserState = cts;
        var task = ExecuteAsync(context, cts.Token);
        return AsyncFactory<T>.ToBegin(task, callback, state);
    }

    protected sealed override T EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
    {
        try
        {
            return AsyncFactory<T>.ToEnd(result);
        }
        catch (OperationCanceledException)
        {
            if (context.IsCancellationRequested)
                context.MarkCanceled();
            else
                throw;
            return default(T); // or throw?
        }
    }

    protected override void Cancel(AsyncCodeActivityContext context)
    {
        var cts = (CancellationTokenSource)context.UserState;
        cts.Cancel();
    }

    protected abstract Task<T> ExecuteAsync(AsyncCodeActivityContext context, CancellationToken cancellationToken);
}

一个小的额外要求:我需要的异步活动应该是在 pick 活动中可能触发的,因此我还需要(优雅的)取消...它如何扩展以支持这种情况?谢谢 - fra
我不熟悉WF取消操作,但我估计你可以在BeginExecute中创建一个CancellationTokenSource(并将其保存在上下文中),将令牌传递给ExecuteAsync。然后重写Cancel方法以从上下文中获取CancellationTokenSource,取消它,并调用MarkCanceled方法。 - Stephen Cleary
感谢提供更新的示例。 在我的测试中,它在选择活动中立即取消除“获胜者”之外的所有分支时表现非常出色。 - fra
1
当我在执行await之后,在ExecuteAsync方法中尝试使用context时,我遇到了异常信息:“只有在传递给函数的范围内才能访问ActivityContext。” - Kris Ivanov
@KrisIvanov:我没有见过那个异常,但我也没有大量使用异步活动。我建议您浏览代码,并确保您没有在任何地方使用async void。如果这不是问题,请将最小重现作为问题发布。 - Stephen Cleary
@KrisIvanov:发现了活动上下文的问题。请查看此更新:https://dev59.com/b4Pba4cB1Zd3GeqPwb-9#26061482 - Stephen Cleary

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接