创建冰冷的TaskCompletionSource?

10

我正在编写一个库,其中包含基于 .Net Tasks 的调度功能(不是标准的TaskSchedulerIScheduler...)。我正在使用TaskCompletionSource,并且Task.Status对于表示底层操作的状态至关重要,包括TaskStatus.Created,即已创建但尚未启动。我知道返回的Tasks通常应该是即时的(hot),但对于我手动控制的代理Tasks,我确实希望它们最初为Created

对我来说不幸的是,TaskCompletionSource.Task的初始状态是WaitingForActivation,即它已经过了Created状态。换句话说,TaskCompletionSource支持两个状态,但我需要三个状态:

问题:如何获取一个Task,我可以手动设置为种不同的状态?也就是说,Task.Status可以设置为:

1) Created
2) 一个WaitingForActivation/WaitingForChildrenToComplete/WaitingToRun/Running
3) RanToCompletion/Canceled/Faulted中的任何一个状态。

下面的代码可以理解为类型不匹配。我可以通过将new Task<TResult>更改为new Task<Task<TResult>>来包装Task,但是要回到Task<TResult>,我必须将其Unwrap(),而取消包装的任务将具有WaitingForActivation的状态,将我带回了起点。

我将有大量这些,所以对于每个任务使用Wait()阻塞线程不是一个选项。

我考虑过从Task继承并覆盖成员(使用新成员),但如果可能的话,给库用户一个实际的Task而不是DerivedTask会很好,特别是因为我在许多其他位置也呈现常规Tasks进行等待。

你有什么建议吗?

private TaskCompletionSource<TResult> tcs;

private async Task<TResult> CreateStartCompleteAsync()
{
    await tcs.Task;
    if (tcs.Task.IsCanceled)
    {
        throw new OperationCanceledException("");
    }
    else if // etc.
}

public ColdTaskCompletionSource()
{
    tcs = new TaskCompletionSource<TResult>();
    Task = new Task<TResult>(() => CreateStartCompleteAsync());
}

错误:
* 由于块中的一些返回类型不能隐式转换为委托返回类型,因此无法将lambda表达式转换为委托类型 'System.Func'
* 无法将类型'System.Threading.Tasks.Task'隐式转换为'TResult'


我认为我已经在这里回答了一个密切相关的问题(https://dev59.com/BX7aa4cB1Zd3GeqPmBwV#22705236)。 - noseratio - open to work
2
我认为,如果你想要更多的控制权,就不应该基于“Task”来构建API,而是应该基于一个自定义类,该类仅公开你想要公开的信息。你也可以公开一个“Task”,但状态信息应该来自你自己的类。 - usr
对于我的底层操作来说,“未开始”和“已开始”之间在逻辑上存在很大的差异,而库使用此状态进行其他决策。能够直接使用TaskStatus将显著减少复杂性和潜在的混淆。 - Kristian Wedberg
1
@KristianWedberg 我明白。我认为在内部状态代码中过载仅存在于调度库中的概念是对TPL的误用。我会创建一个 class ScheduledTask { MyStatusEnum Status; Task Completed; }。你甚至可以将此类型设置为可等待,尽管我认为这是一种反模式。无论如何,你的问题是合理的,应该按照所问进行回答。 - usr
@usr TaskStatus代码显然是公共API的一部分,但我理解你的观点,例如添加新代码并非不可想象。如果TaskCompletionSource直接支持已创建的任务,则没有人会再考虑使用它。当TPL完全开源并可用时,我可以感受到一个拉取请求即将到来 :-) - Kristian Wedberg
显示剩余5条评论
2个回答

6
虽然我同意评论中@usr的观点,但从技术上讲,您仍然可以拥有提供以下状态的实现:
1.已创建 2.等待运行 3.任一<完成/取消/故障>状态
为了避免使用Task.Wait阻塞线程,您可以使用一个内部帮助程序TaskScheduler,该程序将首先将任务从Created转换为WaitingToRun,最终转换为其中之一已完成的状态。
下面的代码说明了这个概念。 它只经过了非常轻微的测试,可能不完全线程安全,并且可能需要改进以在多个任务之间共享单个FakeTaskScheduler实例。
public class ColdTaskCompletionSource
{
    public sealed class FakeTaskScheduler : TaskScheduler
    {
        Task _task;

        public FakeTaskScheduler()
        {
        }

        protected override void QueueTask(Task task)
        {
            _task = task;
        }

        protected sealed override bool TryDequeue(Task task)
        {
            if (task != _task)
                return false;

            _task = null;
            return true;
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            if (_task == null)
                yield break;
            yield return _task;
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }

        public override int MaximumConcurrencyLevel
        {
            get { return 1; }
        }

        public bool Execute()
        {
            if (_task == null)
                return false;

            var task = _task;
            _task = null;
            return base.TryExecuteTask(task);
        }
    }

    readonly Task _task;
    readonly CancellationTokenSource _cts;
    readonly object _lock = new Object();
    readonly FakeTaskScheduler _ts = new FakeTaskScheduler();
    Action _completionAction = null;

    // helpers

    void InvokeCompletionAction()
    {
        if (_completionAction != null)
            _completionAction();
    }

    void Complete()
    {
        if (_task.Status != TaskStatus.WaitingToRun)
            throw new InvalidOperationException("Invalid Task state");
        _ts.Execute();
    }

    // public API

    public ColdTaskCompletionSource()
    {
        _cts = new CancellationTokenSource();
        _task = new Task(InvokeCompletionAction, _cts.Token);
    }

    public Task Task { get { return _task; } }

    public void Start()
    {
        _task.Start(_ts);
    }

    public void SetCompleted()
    {
        lock (_lock)
            Complete();
    }

    public void SetException(Exception ex)
    {
        lock (_lock)
        {
            _completionAction = () => { throw ex; };
            Complete();
        }
    }

    public void SetCancelled()
    {
        lock (_lock)
        {
            _completionAction = () =>
            {
                _cts.Cancel();
                _cts.Token.ThrowIfCancellationRequested();
            };
            Complete();
        }
    }
}

1
@Noseratio:这个非常好用,非常感谢!上面的代码“需要”按照我的“规范”调用Start()。我进行了修改,通过在Start()中也获取锁来使调用Start()变为可选项:lock(_lock) _task.Start(_ts);,并且在Complete()中,我不再抛出异常,而是这样做:if (_task.Status == TaskStatus.Created) _task.Start(_ts); - Kristian Wedberg

2
您无法以合理的方式实现。
“任务”状态由内部处理,手动创建任务的唯一API是使用“TaskCompletionSource”。您也不能继承自“Task”,因为“Status”属性仅是getter,而且您无法访问后备字段“m_stateFlags”,因为它是内部的。
但是,不合理的方法是创建一个等待“TaskCompletionSource”的任务,并通过控制“TaskCompletionSource”来控制任务的状态:
var taskCompletionSource = new TaskCompletionSource<bool>();
var cancellationTokenSource = new CancellationTokenSource();
var task = new Task(() => taskCompletionSource.Task.Wait(cancellationTokenSource.Token), cancellationTokenSource.Token); // task.Status == TaskStatus.Created

task.Start(); // task.Status == TaskStatus.Running

taskCompletionSource.SetResult(false) // task.Status == TaskStatus.RanToCompletion

或者

taskCompletionSource.SetException(new Exception("")) // task.Status == TaskStatus.Faulted

或者

cancellationTokenSource.Cancel() // task.Status == TaskStatus.Cancelled

我并不是真的建议你这样做。我不确定你为什么想要控制Task的状态,但你可能需要创建自己的构造来进行管理,而不是强制Task进入一个不适合它的设计中。


为什么要创建一个热运行的 Task,然后用 Task.Wait() 阻塞线程池线程呢?使用冷 Task 可以很好地模拟 TaskCompletionSource,这是可能的。虽然它也不完美,因为它将内部 Task 暴露给调用者,但在我看来仍然比阻塞更好。 - noseratio - open to work
“我如何获取一个任务,我手动设置状态(已创建,正在运行,已完成,已取消,已故障)?” - i3arnon
@i3arnon:编辑了问题 - 我有很多这样的问题,Wait()不是一个选项。 - Kristian Wedberg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接