如何在捕获的ExecutionContext上运行异步委托

8

正如Stephen Toub在这篇文章中所解释的那样,当您向ActionBlock提交消息时,可以在调用ActionBlock.Post之前执行ExecutionContext.Capture,将同时包含消息和ExecutionContext的DTO传递到块中,然后在消息处理委托内使用ExecutionContext.Run来在捕获的上下文中运行委托:

public sealed class ContextFlowProcessor<T> {
    private struct MessageState {
        internal ExecutionContext Context;
        internal T Value;
    }

    private readonly ITargetBlock<MessageState> m_block;

    public ContextFlowProcessor(Action<T> action) {
        m_block = new ActionBlock<MessageState>(ms =>
        {
            if (ms.Context != null)
                using (ms.Context) ExecutionContext.Run(ms.Context, s => action((T)s), ms.Value);
            else 
                action(ms.Value);
        });
    }

    public bool Post(T item) {
        var ec = ExecutionContext.Capture();
        var rv = m_block.Post(new MessageState { Context = ec, Value = item });
        if (!rv) ec.Dispose();
        return rv;
    }

    public void Done() { m_block.DeclinePermanently(); }

    public Task CompletionTask { get { return m_block.CompletionTask; } }

当消息处理程序内部的逻辑是同步的时,它能够很好地工作。但是,我该如何在捕获的ExecutionContext上运行一个异步逻辑?我需要像这样的东西:

m_block = new ActionBlock<MessageState>(async ms =>
{
      // omitting the null context situation for brevity
      using (ms.Context)
      {
         await ExecutionContext.Run(ms.Context, async _ => { callSomethingAsync(ms.Value) });
      }
});

显然,这段代码不能编译,因为ExecutionContext.Run不支持异步委托(而ActionBlock却支持)-那我该怎么办呢?

提供的链接是关于TPL Dataflow预发布版本的,该版本与当前API不兼容(例如方法DeclinePermanently,属性CompletionTask)。因此,提供的信息可能不准确。我的实验表明,默认情况下会捕获ExecutionContext,而ContextFlowProcessor的行为与简单的ActionBlock相同。您有展示差异的示例吗? - Theodor Zoulias
不是的,我看了当前的源代码,它只在启动新任务时(隐式地)捕获ExecutionContext,这发生在您发布第一条消息或长时间间隔后的第一条消息,但不适用于排队到正在运行的任务循环的后续消息。 - Andriy Volkov
此处所解释的那样,async/await关键字在幕后使用了ExecutionContextasync/await只是一些基础设施,帮助模拟异步编程中的同步语义。因此,当您使用ExecutionContext时,这意味着您需要手动处理事情。我认为,在它们自己基于ExecutionContext.Run并且正在使用它的情况下,ExecutionContext.Run支持async/await是没有意义的。 - Ali Zeinali
1
使用 Task.Run 怎么样? - Victor P
1个回答

2

如果您能提供一个自包含的例子,我们可以尝试重现问题,这样我们可能会提供更好的答案。话虽如此,使用简单的自定义同步上下文,手动控制ExecutionContext(或者说它的副本)在await继续中的流程是可能的。以下是一个示例(警告 - 几乎未经测试!):

Original Answer翻译成"最初的回答"

// using EcFlowingSynchronizationContext:

m_block = new ActionBlock<MessageState>(async ms =>
{
      using (ms.Context)
      using (var sc = new EcFlowingSynchronizationContext(ms.Context))
      {
         await sc.Run(async _ => { await callSomethingAsync(ms.Value); });
      }
});

// EcFlowingSynchronizationContext: flow execution context manually 

public class EcFlowingSynchronizationContext : SynchronizationContext, IDisposable
{
    private readonly ExecutionContext _ec;
    private readonly TaskScheduler _taskScheduler;

    public EcFlowingSynchronizationContext(ExecutionContext sourceEc) 
    {
        TaskScheduler ts = null;
        ExecutionContext ec = null;

        ExecutionContext.Run(sourceEc, _ =>
        {
            var sc = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(this);
            try
            {
                ts = TaskScheduler.FromCurrentSynchronizationContext();
                // this will also capture SynchronizationContext.Current,
                // and it will be flown by subsequent ExecutionContext.Run
                ec = ExecutionContext.Capture();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(sc);
            }
        }, null);

        _ec = ec;
        _taskScheduler = ts;
    }

    private void Execute(SendOrPostCallback d, object state)
    {
        using (var ec = _ec.CreateCopy())
        {
            ExecutionContext.Run(ec, new ContextCallback(d), state);
        }
    }

    public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        ThreadPool.UnsafeQueueUserWorkItem(s => Execute(d, s), state);
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        Execute(d, state);
    }

    public override SynchronizationContext CreateCopy()
    {
        return this;
    }

    public void Dispose()
    {
        _ec.Dispose();
    }
}

注意,您应该仅使用CallContext.LogicalSetData(或AsyncLocal<T>)存储不可变值。也就是说,如果您需要存储在调用方和调用者之间的异步流中可能会更改的内容,并能够跟踪调用者中的更改,则将其作为类的属性,然后存储该类的实例。确保该类也是线程安全的,因为最终您可以有许多原始执行上下文的并发分支。
有关详细信息,请参阅Stephen Cleary的博客文章:Implicit Async Context ("AsyncLocal")"Eliding Async and Await"

所以基本上,Task.Run带有一些额外的细节? - Andriy Volkov
@zvolkov,这根本不使用Task.Run。 我只是为了方便而使EcFlowingSynchronizationContext.Run看起来相似。 - noseratio - open to work
我的意思是,你的sc.Run使用Task.Factory.StartNew来执行我所询问的部分(运行异步代码),并且它在目标EC上运行的方式是通过从目标EC.Run内获取FromCurrentSynchronizationContext任务调度程序 - 但我想知道是否可以直接在我的目标EC.Run内使用Task.Factory.StartNew,而不创建这个额外的类? - Andriy Volkov
@zvolkov 如果你像那样调用 Task.Factory.StartNew,你将提供什么 TaskScheduler?如果你给它 TaskScheduler.Default,你将得到默认的 EC 传播行为,这是你一开始不满意的。使用自定义同步上下文和自定义任务计划程序的整个过程是为了改变它的传播方式。这个实现是否按照你想要的方式工作?还是我漏掉了什么? - noseratio - open to work
不,我只是想理解其基本原理。我需要进行测试。 - Andriy Volkov
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接