取消令牌源和嵌套任务

3

我对取消令牌源产生了疑问,下面的代码中我正在使用它:

    void Process()
    {
        //for the sake of simplicity I am taking 1, in original implementation it is more than 1
        var cancellationToken = _cancellationTokenSource.Token;
        Task[] tArray = new Task[1];
        tArray[0] = Task.Factory.StartNew(() =>
        {
            cancellationToken.ThrowIfCancellationRequested();
            //do some work here
            MainTaskRoutine();
        }, cancellationToken);

        try
        {
            Task.WaitAll(tArray);
        }
        catch (Exception ex)
        {
            //do error handling here
        }
    }

    void MainTaskRoutine()
    {
        //for the sake of simplicity I am taking 1, in original implementation it is more than 1
        //this method shows that a nested task is created 
        var cancellationToken = _cancellationTokenSource.Token;
        Task[] tArray = new Task[1];
        tArray[0] = Task.Factory.StartNew(() =>
        {
            cancellationToken.ThrowIfCancellationRequested();
            //do some work here

        }, cancellationToken);

        try
        {
            Task.WaitAll(tArray);
        }
        catch (Exception ex)
        {
         //do error handling here
        }
    }


编辑: 进一步解释

最终目标是: 当用户取消操作时,所有正在进行的立即挂起任务(包括子任务或孙子任务)都应该被取消。

场景: 根据上述代码: 1. 首先检查用户是否要求取消 2. 如果用户没有要求取消,则继续执行任务 (请参阅Process方法)。 样本代码仅显示一个任务,但实际上可能有三个或更多

假设 CPU 在处理 Task1 的同时,其他任务仍在任务队列中等待某个 CPU 来执行它们。 用户请求取消:Process 方法中的 Task 2,3 立即取消,但 Task 1 将继续工作,因为它已经在处理中。

Task 1 中调用了 MainTaskRoutine 方法,该方法又创建了更多的任务。

在 MainTaskRoutine 函数中,我写道:cancellationToken.ThrowIfCancellationRequested();

所以问题是: 这种使用 CancellationTokenSource 的方式是否正确,因为它依赖于 Task.WaitAll()?

2个回答

4
[编辑] 由于您的代码中使用了数组,我假设可能会有多个任务,而不仅仅是一个。我还假设在您从Process开始的每个任务中,您想先进行一些CPU密集型的工作(//在此处执行一些工作),然后再运行MainTaskRoutine
如何处理任务取消异常取决于您的项目设计工作流程。例如,您可以在Process方法内部或从调用Process的位置处理它。如果您唯一的关注点是从跟踪待处理任务的数组中删除Task对象,则可以使用Task.ContinueWith来完成此操作。无论任务的完成状态(CancelledFaultedRanToCompletion)如何,都将执行延续操作。
Task Process(CancellationToken cancellationToken)
{
    var tArray = new List<Task>();
    var tArrayLock = new Object();

    var task = Task.Run(() =>
    {
        cancellationToken.ThrowIfCancellationRequested();
        //do some work here

        return MainTaskRoutine(cancellationToken);
    }, cancellationToken);

    // add the task to the array,
    // use lock as we may remove tasks from this array on a different thread
    lock (tArrayLock)
        tArray.Add(task);
    task.ContinueWith((antecedentTask) =>
    {
        if (antecedentTask.IsCanceled || antecedentTask.IsFaulted)
        {
            // handle cancellation or exception inside the task
            // ...
        }
        // remove task from the array,
        // could be on a different thread from the Process's thread, use lock
        lock (tArrayLock)
            tArray.Remove(antecedentTask);
    }, TaskContinuationOptions.ExecuteSynchronously);

    // add more tasks like the above
    // ...

    // Return aggregated task
    Task[] allTasks = null;
    lock (tArrayLock)
        allTasks = tArray.ToArray();
    return Task.WhenAll(allTasks);
}

您的MainTaskRoutine可以结构化地与Process完全相同,并具有相同的方法签名(返回一个Task)。

然后,您可能希望在由Process返回的聚合任务上执行阻止等待,或异步处理其完成,例如:

// handle the completion asynchronously with a blocking wait
void RunProcessSync()
{
    try
    {
        Process(_cancellationTokenSource.Token).Wait();
        MessageBox.Show("Process complete");
    }
    catch (Exception e)
    {
        MessageBox.Show("Process cancelled (or faulted): " + e.Message);
    }
}

// handle the completion asynchronously using ContinueWith
Task RunProcessAync()
{
    return Process(_cancellationTokenSource.Token).ContinueWith((task) =>
    {
        // check task.Status here
        MessageBox.Show("Process complete (or cancelled, or faulted)");
    }, TaskScheduler.FromCurrentSynchronizationContext());
}

// handle the completion asynchronously with async/await
async Task RunProcessAync()
{
    try
    {
        await Process(_cancellationTokenSource.Token);
        MessageBox.Show("Process complete");
    }
    catch (Exception e)
    {
        MessageBox.Show("Process cancelled (or faulted): " + e.Message);
    }
}

请注意,Task.Run将展开嵌套任务,因此从Task.Run返回的任务实际上是由MainTaskRoutine返回的任务。还要注意使用的TaskContinuationOptions.ExecuteSynchronously,以确保在触发WhenAll之前执行清理代码。 - noseratio - open to work
我相信我发布的代码行为是正确的。所有创建的任务都共享同一个“cancellationToken”。只要您在每个任务内定期调用 cancellationToken.ThrowIfCancellationRequested(),任何挂起的任务都将被取消。ProcessMainTaskRoutine本身不执行任何CPU绑定的任务,因此它们将在其所有子任务已完成(WhenAll)后立即被取消。但是,您可以在“Process”和“MainTaskRoutine”的开头添加cancellationToken.ThrowIfCancellationRequested(),以确保它们不会启动任何子任务。 - noseratio - open to work
我是否可以继续将CancellationTokenSource作为参数传递给Task,以便在需要创建任何子任务时可以使用此参数? - Devesh
1
是的,您可以(而且应该)将相同的 CancellationTokenSource.Token 传递给尽可能多的任务,前提是它们都应该在调用该 CancellationTokenSource 对象上的 CancellationTokenSource.Cancel 后逻辑上被取消。 - noseratio - open to work
应用程序中有一个主线程,它将一直等待于Task.WaitAll()。我所说的即时任务是指由该主线程直接创建的线程/任务。 - Devesh
显示剩余3条评论

1
我做了一些研究,发现这个链接
代码现在看起来像这样: 请查看下面代码中CancellationTokenSource.CreateLinkedTokenSource的用法
    void Process()
    {
        //for the sake of simplicity I am taking 1, in original implementation it is more than 1
        var cancellationToken = _cancellationTokenSource.Token;
        Task[] tArray = new Task[1];
        tArray[0] = Task.Factory.StartNew(() =>
        {
            cancellationToken.ThrowIfCancellationRequested();
            //do some work here
            MainTaskRoutine(cancellationToken);
        }, cancellationToken);

        try
        {
            Task.WaitAll(tArray);
        }
        catch (Exception ex)
        {
            //do error handling here
        }
    }

    void MainTaskRoutine(CancellationToken cancellationToken)
    {
        //for the sake of simplicity I am taking 1, in original implementation it is more than 1
        //this method shows that a nested task is created 

        using (var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
        {
            var cancelToken = cancellationTokenSource.Token;
            Task[] tArray = new Task[1];
            tArray[0] = Task.Factory.StartNew(() =>
            {
                cancelToken.ThrowIfCancellationRequested();
                //do some work here

            }, cancelToken);

            try
            {
                Task.WaitAll(tArray);
            }
            catch (Exception ex)
            {
                //do error handling here
            } 
        }
    }

注意:我还没有使用过它,但一旦完成,我会让您知道 :)

为什么需要一个新的链接取消源?反正我没看到你在它上面调用“Cancel()”。在我看来,使用至少两个标记与“CreateLinkedTokenSource”是有意义的。 - noseratio - open to work
根据我的问题,是用户取消立即任务,这些任务将触发这些“MainTaskRoutine”任务。总的来说,这将对所有待处理任务产生涟漪效应。希望这样说清楚了。 - Devesh
如果取消的顺序不重要,单个原始取消令牌就足以取消所有任务。 - noseratio - open to work

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接