如何为 Dispatcher 获取一个 TaskScheduler?

20

我的应用程序有多个 Dispatcher (又称GUI线程,消息泵),以确保GUI的某个缓慢、无响应的部分不会对整个应用程序产生太大影响。我还经常使用 Task

目前,我有一些代码条件性地在 TaskScheduler Dispatcher 上运行 Action ,然后直接返回一个 Task 或通过手动创建一个 TaskCompletionSource 来返回一个 Task 。然而,这种拆分式设计使得处理取消、异常等问题变得比我想象的要复杂得多。我希望在任何地方都使用 Task ,而不是 DispatcherOperation 。为此,我需要在调度程序上安排任务——但如何实现呢?

如何获得特定 Dispatcher TaskScheduler

编辑:在下面的讨论之后,我决定采用以下实现方式:

public static Task<TaskScheduler> GetScheduler(Dispatcher d) {
    var schedulerResult = new TaskCompletionSource<TaskScheduler>();
    d.BeginInvoke(() => 
        schedulerResult.SetResult(
            TaskScheduler.FromCurrentSynchronizationContext()));
    return schedulerResult.Task;
}
6个回答

16

步骤1:创建一个扩展方法:

public static Task<TaskScheduler> ToTaskSchedulerAsync (
    this Dispatcher dispatcher,
    DispatcherPriority priority = DispatcherPriority.Normal) {

    var taskCompletionSource = new TaskCompletionSource<TaskScheduler> ();
    var invocation = dispatcher.BeginInvoke (new Action (() =>
        taskCompletionSource.SetResult (
            TaskScheduler.FromCurrentSynchronizationContext ())), priority);

    invocation.Aborted += (s, e) =>
        taskCompletionSource.SetCanceled ();

    return taskCompletionSource.Task;
}

步骤2:使用扩展方法:

旧语法

var taskSchedulerAsync = Dispatcher.CurrentDispatcher.ToTaskSchedulerAsync ();
var taskFactoryAsync = taskSchedulerAsync.ContinueWith<TaskFactory> (_ =>
    new TaskFactory (taskSchedulerAsync.Result), TaskContinuationOptions.OnlyOnRanToCompletion);
// this is the only blocking statement, not needed once we have await
var taskFactory = taskFactoryAsync.Result;
var task = taskFactory.StartNew (() => { ... });

新语法:

var taskScheduler = await Dispatcher.CurrentDispatcher.ToTaskSchedulerAsync ();
var taskFactory = new TaskFactory (taskScheduler);
var task = taskFactory.StartNew (() => { ... });

1
Aborted 事件处理得很好!唯一剩下的缺点是任务计划程序是异步创建的;也就是说,您必须等待调度程序运行调用后才能安排任务(这不是一个主要问题,因为您可以使用 ContinueWith,但有点不方便)。 - Eamon Nerbonne
好的,await在这里实际上做的是相同的事情,即导致在Dispatcher运行后异步执行的Task。具体来说,这意味着当包含方法的执行完成时,任务调度程序可能尚不可用(即在.NET 4.5中问题仍然存在)。这不是关键问题;无论如何,只要创建调度程序的唯一方式是在同步上下文本身上(这是一个奇怪的API限制,但它就是这样...), 就不能解决它。 - Eamon Nerbonne
1
将默认优先级设置为“发送”而不是“普通”的思考背后是什么? - Factor Mystic
1
@FactorMystic 看起来 George 采纳了你的反馈。现在我们进入了“长尾”领域,也许可以删除你的评论? - yzorg
1
@FactorMystic:“Send”是可用的最高优先级。它的名称可能来自于Win32消息循环中的“SendMessage”/“PostMessage”对,其中“Post”表示“将消息排队”,而“Send”表示“同步处理消息”,这通常意味着“立即执行”,这与“最高优先级”的概念相当吻合。说到这个问题,由于“延迟TaskScheduler创建”可能是一个问题,我们希望它“立即”完成,因此具有最高优先级。当然,这并不能解决问题,只能稍微降低发生的几率。 - quetzalcoatl

9
很遗憾,没有内置的方法来做到这一点。没有专门用于将Dispatcher封装在TaskScheduler中的内置类——我们最接近的是封装SynchronizationContext的类。而从SynchronizationContext构建TaskScheduler的唯一公共API是Paul Michalik提到的TaskScheduler.FromCurrentSynchronizationContext。正如你所观察到的那样,只有当你已经在相关的同步上下文(即相关的调度程序线程)中时,它才起作用。
因此,你有三个选择:
  1. 安排你的代码,使需要相关调度程序的调度器的类有机会在这些调度程序的线程上运行,以便你可以按预期使用TaskScheduler.FromCurrentSynchronizationContext
  2. 使用Dispatcher.BeginInvoke在调度程序线程上运行一些代码,并在该代码中调用TaskScheduler.FromCurrentSynchronizationContext。(换句话说,如果你不能自然地安排第一种情况发生,就强制它发生。)
  3. 编写自己的任务调度程序。

我只能接受一个答案,但这也是我不接受这个答案的唯一原因 - 这几乎就是我最终所做的。谢谢! - Eamon Nerbonne

4
请看TaskScheduler.FromCurrentSynchronizationContext。即使应用程序强制使用特定的线程模型,任务框架也提供了一种非常灵活的方式来配置计算绑定操作的执行。
编辑:
嗯,从您发布的内容中很难得到更明确的信息。我理解您正在运行一种多视图应用程序,每个视图都有单独的调度程序,对吗?由于所有调度都归结为获取SynchronizationContext并将其Post,因此您可以在某些时候获取正确的TaskScheduler(具有正确的SynchronizationContext),当您的视图(们)拥有其中一个时。在配置任务时获取TaskScheduler的简单方法如下:
 // somewhere on GUI thread you wish to invoke
 // a long running operation which returns an Int32 and posts
 // its result in a control accessible via this.Text
 (new Task<Int32>(DoSomeAsyncOperationReturningInt32)
      .ContinueWith(tTask => this.Text = tTask.Result.ToString(),
                    TaskScheduler.FromCurrentSynchronizationContext)).Start();

不确定这是否有帮助,如果您经常使用任务,那么您可能已经知道...


我想我明白你的意思 - 但是FromCurrentSynchronizationContext不接受Dispatcher作为参数,而我有好几个。如果您能明确解决方案,您的答案将对其他人有用,我会接受它;-) - Eamon Nerbonne
嗯,我尝试更加明确一些,可以看一下编辑后的答案。此外,考虑Ian的建议,可以自己编写一个TaskScheduler,这样可以更适当地封装上下文切换... - Paul Michalik
那确实有帮助,但您的示例包括一些关于Text和Int32的不必要的部分;我进行了编辑以反映我最终使用的实际方法。可能有点过度干涉,请根据您的意见修正答案,如果我改得太多请谅解。 - Eamon Nerbonne
是的,你完全重写了它 :-) 我的意图是展示如何获取同步上下文并在其上安排继续任务。如果这基本上解决了你的问题,我宁愿将片段剪切并粘贴到你的问题中。 - Paul Michalik

4
您可以将整个功能写在一行中:
public static Task<TaskScheduler> ToTaskSchedulerAsync(this Dispatcher dispatcher,
                           DispatcherPriority priority = DispatcherPriority.Normal)
{
    return dispatcher.InvokeAsync<TaskScheduler>(() =>
         TaskScheduler.FromCurrentSynchronizationContext(), priority).Task;
}

对于那些满意默认UI线程的人来说,以下内容可能足以应付:

var ts = Application.Current.Dispatcher.Invoke<TaskScheduler>(() => TaskScheduler.FromCurrentSynchronizationContext());

1
我当时写那个程序的时候,.NET 4.5 还不存在,所以我不可能使用它。但是现在你可以用了! :-) - Eamon Nerbonne
没错,我发完回复几个小时后也想到了这一点。 - SmallBizGuy

1
虽然没有专门用于将Dispatcher包装在TaskScheduler中的内置类,但你可以自己编写一个。完成此任务的示例实现如下所示:
internal class DispatcherTaskScheduler : TaskScheduler
{
    private readonly Dispatcher _dispatcher;
    private readonly SendOrPostCallback _dispatcherCallback;

    public DispatcherTaskScheduler(Dispatcher dispatcher)
    {
        _dispatcher = dispatcher;
        // Callback for dispatcher.
        _dispatcherCallback = (state) =>
        {
            _ = TryExecuteTask((Task)state!);
        };
    }

    public override int MaximumConcurrencyLevel
    {
        // Dispatcher operates on one thread only.
        get => 1;
    }

    protected override IEnumerable<Task> GetScheduledTasks()
    {
        // Whole method is only for debugging purposes. No need to implement it.
        return Array.Empty<Task>();
    }

    protected override void QueueTask(Task task)
    {
        // Schedule the task for execution.
        _ = _dispatcher.BeginInvoke(DispatcherPriority.Normal, _dispatcherCallback, task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        // If we are in the right thread then execute the task.
        if (_dispatcher.CheckAccess())
            return TryExecuteTask(task);
        return false;
    }
}

0

这就是我经常将异步函数调用转换为同步函数调用的方法(向互联网上某个人致敬):

    public static class ThreadingUtils 
    {
         public static TaskScheduler GetScheduler(Dispatcher dispatcher)
         {
             using (var waiter = new ManualResetEvent(false))
             {
                 TaskScheduler scheduler = null;
                 dispatcher.BeginInvoke(new Action(() =>
                 {
                     scheduler = 
                         TaskScheduler.FromCurrentSynchronizationContext();
                     waiter.Set();
                 }));
                 waiter.WaitOne();
                 return scheduler;
             }
         }
    }

变量:

    if (!waiter.WaitOne(2000))
    { 
        //Timeout connecting to server, log and exit
    }

2
虽然这样做可以工作,但既然已经在使用TPL,为什么还要使用手动等待呢?TaskCompletionSource方法可以让您在需要时阻塞,如果不需要则不会阻塞,并且代码更短。 - Eamon Nerbonne
正确,我也可以调用Dispatcher.Invoke(...),它在内部执行完全相同的操作... 我的观点是展示适用于任何异步调用的通用模式... 另外,超时处理怎么办? - agroskin
还有,返回多个结果集怎么办?还有多个等待块呢?顺便说一句,我认为这就是 Microsoft 实现 "async" 和 "await" 功能的方式... - agroskin
不,这与在内部使用TaskCompletionSource并不相同;这可能不需要锁定或事件,具体取决于等待结果的内容以及控制等待的TaskScheduler。而且它几乎肯定不需要重量级的操作系统级ManualResetEvent——这是一个多进程同步工具,而不仅仅是一个多线程同步工具。 - Eamon Nerbonne

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接