如何在长时间运行的任务取消后正确清理?

6
我创建了一个类,其目的是抽象掉对队列并发访问的控制。
该类设计为在单个线程上实例化,由多个线程编写,然后由后续的单个线程读取。
我在类内部生成了一个长时间运行的任务,它将执行阻塞循环,并在成功出列项目时触发事件。
我的问题是:我的取消长时间运行的任务和随后的清理/重置的实现是否正确使用了CancellationTokenSource对象?
理想情况下,我希望能够停止和重新启动活动对象,同时保持向队列添加的可用性。
我以Peter Bromberg的文章为基础:使用C# 4.0中的生产者/消费者队列和BlockingCollection 以下是代码:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    public delegate void DeliverNextQueuedItemHandler<T>(T item);

public sealed class SOQueueManagerT<T> 
{

    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning { get; private set; }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public CancellationTokenSource CancellationTokenSource
    {
        get
        {
            if (_canceller == null)
                _canceller = new CancellationTokenSource();

            return _canceller;
        }
    }

    public SOQueueManagerT()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);

        IsRunning = false;
    }

    public void Start()
    {
        if (_listener == null)
        {


            IsRunning = true;

            _listener = Task.Factory.StartNew(() =>
            {

                while (!CancellationTokenSource.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {

                            OnNextItem(item);
                        }
                    }

                }
            },
            CancellationTokenSource.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            CancellationTokenSource.Cancel();
            CleanUp();
        }
    }

    public void Add(T item)
    {
        _queue.Add(item);
    }

    private void CleanUp()
    {
        _listener.Wait(2000);
        if (_listener.IsCompleted)
        {
            IsRunning = false;
            _listener = null;
            _canceller = null;
        }
    }


 }
}

更新 最终我选择了以下方案。它不是完美的,但目前已经能够完成工作。

public sealed class TaskQueueManager<T> 
{
    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning
    {
        get
        {
            if (_listener == null)
                return false;
            else if (_listener.Status == TaskStatus.Running ||
                _listener.Status == TaskStatus.Created ||
                _listener.Status == TaskStatus.WaitingForActivation ||
                _listener.Status == TaskStatus.WaitingToRun ||
                _listener.IsCanceled)
                return true;
            else
                return false;
        }
    }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public TaskQueueManager()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);
    }

    public void Start()
    {
        if (_listener == null)
        {
            _canceller = new CancellationTokenSource();

            _listener = Task.Factory.StartNew(() =>
            {
                while (!_canceller.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {
                            try
                            {
                                OnNextItem(item);
                            }
                            catch (Exception e)
                            {
                                //log or call an event
                            }
                        }
                    }
                }
            },
            _canceller.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            _canceller.Cancel();

            if (_listener.IsCanceled && !_listener.IsCompleted)
                _listener.Wait();

            _listener = null;
            _canceller = null;
        }
    }

    public void Add(T item)
    {
        if (item != null)
        {
            _queue.Add(item);
        }
        else
        {
            throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null");
        }
    }
}
1个回答

1
小心的编程是唯一能解决问题的方法。即使你取消了操作,也可能存在一个未能在适当的时间内完成的待处理操作。它很可能是一个阻塞操作而产生死锁。在这种情况下,你的程序实际上不会终止。
例如,如果我多次调用您的CleanUp方法或在未调用Start的情况下调用该方法,我感觉它会崩溃。
2秒的清理超时似乎比计划更任意,我实际上会确保事物正确关闭或崩溃/挂起(永远不要让并发的东西处于未知状态)。
此外,“IsRunning”是明确设置的,而不是从对象状态推断出来的。
为了得到灵感,我希望你看看我最近编写的一个类似的类,它是一个在后台线程中工作的生产者/消费者模式。你可以在CodePlex上找到源代码。尽管这是为了解决一个非常具体的问题而设计的。
在这里,取消操作通过将只有工作线程能够识别的特定类型入队来解决,并因此开始关闭。这也确保我从不取消待处理的工作,只有整个工作单元才会被考虑。
为了改善这种情况,您可以为当前工作设置一个单独的计时器,并在取消操作时中止或回滚未完成的工作。现在,实现类似于“事务”的行为将需要一些试错,因为您需要考虑每种可能的边界情况并问自己,如果程序在这里崩溃会发生什么?理想情况下,所有这些代码路径都应该导致可恢复或已知状态,从中您可以恢复工作。但正如您可能已经猜到的那样,这将需要谨慎的编程和大量的测试。

约翰:是的,我也注意到多次调用Stop()可能会导致问题。我修改了Stop()方法,使其在返回之前等待任务完成。是的,这使它成为一个阻塞调用,在这个阶段是可以的。我可能会提供一个Stop方法的覆盖,以提供一个超时,在其中转发到Wait(timeout)调用。你提出的IsRunning点是有效的。 - MattC
是的,我看了,很有趣,我明白你做了什么。我要解决的具体问题是正确处理内部消费者任务的启动、停止和重新启动。即使在停止消费者时,也可以添加队列。我已经完全删除了CleanUp方法,并更具体地基于任务状态设置了IsRunning属性。我会保持这个开放,以防有人专门回答我的TPL使用问题。如果没有,我就采纳你的建议。 - MattC
@MattC - FYI,TPL 主要是为了解决并行性问题而设计的,而不是并发性问题。对于这些相当简单的并发性问题来说,使用 TPL 可能是不必要或不可取的。 - John Leidegren
同意,我所做的只是将线程的概念抽象化,并引入了任务...可以说是一个零和游戏... :) - MattC

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接