检测父任务取消的正确方法是什么?

3

我正在开发一个使用任务和信号量的概念验证应用程序来分解数字列表,目前我有一个List<Task>的任务列表,这些任务接受一个FactorNumberClass并计算FactorNumberClass中指定数字的因数,这目前运行正确。对于每个任务T,我都有一个ContinueWith任务,它更新已分解数字的总数进度、分解时间的平均值,并使用(成功分解的数字)/(待分解的总数)的值更新进度条。当分解这些Tasks时,它们会进入一个SemaphoreSlim.Wait(cancelToken),该函数限制当前分解活动为5个活动的Tasks。最后,我有一个ContinueWhenAll,在所有任务完成时记录日志。如果没有取消操作,这一切都按照我的意图正常工作。

问题出现在尝试取消任务时,我无法检测到任务是否已被取消,因此无法准确确定数字是否已成功分解或已取消。如何检测父任务是否已取消或已完成?

取消令牌定义:

public static CancellationTokenSource tokenSource = new CancellationTokenSource();
public static CancellationToken ct = tokenSource.Token;

因子类代码:

public class FactorNumberClass
{
    public FactorNumberClass()
    {
    }

    public FactorNumberClass(int num, int threadnum)
    {
        this.number = num;
        this.threadNumber = threadnum;
    }

    public List<int> factors = new List<int>();
    public int number;
    public int max;
    public int threadNumber;
}

分解方法:

public void Factor(FactorNumberClass F, CancellationToken token)
        {
            LogtoStatusText("Thread: " + F.threadNumber + " Trying to enter semaphore");

            try
            {
                ASemaphore.Wait(ct);

                F.max = (int)Math.Sqrt(F.number);  //round down

                for (int factor = 1; factor <= F.max; ++factor)
                { //test from 1 to the square root, or the int below it, inclusive.
                    if (F.number % factor == 0)
                    {
                        F.factors.Add(factor);
                        if (factor != F.number / factor)
                        {
                            F.factors.Add(F.number / factor);
                        }
                    }
                }

                F.factors.Sort();
                Thread.Sleep(F.number * 300);
                LogtoStatusText("Task: " + F.threadNumber + " Completed - Factors: " + string.Join(",", F.factors.ToArray()));
                LogtoStatusText("Thread: " + F.threadNumber + " Releases semaphore with previous count: " + ASemaphore.Release());
            }
            catch (OperationCanceledException ex)
            {
                LogtoStatusText("Thread: " + F.threadNumber + " Cancelled.");
            }
            finally
            {
            }
        }

开始处理的方法:

public void btnStart_Click(object sender, RoutedEventArgs e)
        {
            Task T;
            List<Task> TaskList = new List<Task>();
            LogtoStatusText("**** Begin creating tasks *****");
            s1.Start();

            AProject.FactorClassList.ForEach((f) =>
            {
                T = new Task(((x) => { OnUIThread(() => { RunningTasks++; }); Factor(f, ct); }), ct);

                T.ContinueWith((y) =>
                {
                    if (y.IsCompleted)
                    {
                        AProject.TotalProcessedAccounts++;
                        AProject.AverageProcessTime = (((Double)AProject.TotalProcessedAccounts / s1.ElapsedMilliseconds) * 1000);
                    }
                    OnUIThread(() => { RunningTasks--; });
                    OnUIThread(() => { UpdateCounts(AProject); });
                });

                TaskList.Add(T);
            });

            try
            {
                Task.Factory.ContinueWhenAll(TaskList.ToArray(), (z) => { LogtoStatusText("**** Completed all Tasks *****"); OnUIThread(() => { UpdateCounts(AProject); }); });
            }
            catch (AggregateException a)
            {
                // For demonstration purposes, show the OCE message.
                foreach (var v in a.InnerExceptions)
                    LogtoStatusText("msg: " + v.Message);
            }

            LogtoStatusText("**** All tasks have been initialized, begin processing *****");
            TaskList.ForEach(t => t.Start());
        }
3个回答

2

finally块中释放信号量,以便始终正确释放。无需检测取消。

另外,埋藏在日志消息中的副作用不是好的编程风格:

LogtoStatusText("..." + ASemaphore.Release());

我只是通过文本搜索才发现这个错误,否则我永远不会注意到它。

很好,LogtoStatusText只是一个输出,用于查看应用程序的运行情况,但您完全正确。 - Pseudonym
此外,如果已请求取消,则我不希望现有的等待任务进入信号量,如果不使用Wait(cancellationToken),它们将进入该信号量。 - Pseudonym
我意识到这个答案可能并不能帮助你解决问题。考虑让取消异常通过throw;Factor中传播出去。这样可以传播取消操作。另外,考虑在一些地方使用await来简化逻辑。现在有些东西很难读懂。await特别适合自动切换回UI线程。 - usr
除此之外,即使取消令牌(ct)引发了取消事件,使用Finally释放信号量也会每次释放它,这最终会将信号量计数增加到超过最大值。 - Pseudonym
无法使用await,我被困在VS2010和.NET 4.0中。 - Pseudonym

1
使用取消令牌:
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
    static void Main()
    {
        var tokenSource2 = new CancellationTokenSource();
        CancellationToken ct = tokenSource2.Token;

        var task = Task.Factory.StartNew(() =>
        {

            // Were we already canceled?
            ct.ThrowIfCancellationRequested();

            bool moreToDo = true;
            while (moreToDo)
            {
                // Poll on this property if you have to do 
                // other cleanup before throwing. 
                if (ct.IsCancellationRequested)
                {
                    // Clean up here, then...
                    ct.ThrowIfCancellationRequested();
                }

            }
        }, tokenSource2.Token); // Pass same token to StartNew.

        tokenSource2.Cancel();

        // Just continue on this thread, or Wait/WaitAll with try-catch: 
        try
        {
            task.Wait();
        }
        catch (AggregateException e)
        {
            foreach (var v in e.InnerExceptions)
                Console.WriteLine(e.Message + " " + v.Message);
        }
        finally
        {
            tokenSource2.Dispose();
        }

        Console.ReadKey();
    }
}

https://msdn.microsoft.com/en-us/library/dd997396%28v=vs.110%29.aspx


这与信号量和ContinueWith有什么关联?我以前看过这篇文章,但我认为它没有解决我的问题。 - Pseudonym

0

我终于找到了我一直在寻找的解决方案,它允许我启动(Start())所有的Task对象,通过一个信号量进行运行,观察CancellationToken,然后检测Task是否被取消或正常完成。在这种情况下,只有当Task进入信号量并开始处理之前,CancellationTokenSource.Cancel()才会被触发,Task才会“正常完成”。

这个答案:优雅地处理任务取消将我引导到了正确的方向。最终我捕获了OperationCancelledException,记录了它,然后重新抛出它,以便在ContinueWithTask中进行检查。

以下是解决我的问题的更新代码:

Factor类:

private void Factor(FactorNumberClass F)
        {


            LogtoStatusText("Thread: " + F.threadNumber + " Trying to enter semaphore");

            try
            {
                ASemaphore.Wait(ct);

                F.max = (int)Math.Sqrt(F.number);  //round down

                for (int factor = 1; factor <= F.max; ++factor)
                { //test from 1 to the square root, or the int below it, inclusive.
                    if (F.number % factor == 0)
                    {
                        F.factors.Add(factor);
                        if (factor != F.number / factor)
                        {
                            F.factors.Add(F.number / factor);
                        }
                    }
                }

                F.factors.Sort();

                Thread.Sleep(F.number * 300);

                LogtoStatusText("Task: " + F.threadNumber + " Completed - Factors: " + string.Join(",", F.factors.ToArray()));

                LogtoStatusText("Thread: " + F.threadNumber + " Releases semaphore with previous count: " + ASemaphore.Release());
            }
            catch
            {
                LogtoStatusText("Thread: " + F.threadNumber + " Cancelled");
                throw;

            }
            finally
            {

            }

        }

处理方法:

public void btnStart_Click(object sender, RoutedEventArgs e)
{
    LaunchTasks();
}

private void LaunchTasks()
        {
            Task T;
            List<Task> TaskList = new List<Task>();

            LogtoStatusText("**** Begin creating tasks *****");

            s1.Start();

            AProject.FactorClassList.ForEach((f) =>
            {
                T = new Task(((x) => { OnUIThread(() => { RunningTasks++; }); Factor(f); }), ct);

                T.ContinueWith((y) =>
                {
                    if (y.Exception != null)
                    {
                        // LogtoStatusText(y.Status + " with "+y.Exception.InnerExceptions[0].GetType()+": "+ y.Exception.InnerExceptions[0].Message);
                    }
                    if (!y.IsFaulted)
                    {

                        AProject.TotalProcessedAccounts++;
                        AProject.AverageProcessTime = (((Double)AProject.TotalProcessedAccounts / s1.ElapsedMilliseconds) * 1000);
                    }
                    OnUIThread(() => { RunningTasks--; });
                    OnUIThread(() => { UpdateCounts(AProject); });


                });

                TaskList.Add(T);
            });

            try
            {
                Task.Factory.ContinueWhenAll(TaskList.ToArray(), (z) => { LogtoStatusText("**** Completed all Tasks *****"); OnUIThread(() => { UpdateCounts(AProject); }); });
            }
            catch (AggregateException a)
            {
                // For demonstration purposes, show the OCE message.
                foreach (var v in a.InnerExceptions)
                    LogtoStatusText("msg: " + v.Message);
            }

            LogtoStatusText("**** All tasks have been initialized, begin processing *****");

            TaskList.ForEach(t => t.Start());
        }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接