C#: 更好的线程架构

3

我很感兴趣了解您对以下规则下哪种线程架构更好的想法:

  • 一个线程必须在应用程序的生命周期内运行,在队列中没有要执行的任务时处于睡眠/等待模式。

  • 线程必须具有BelowNormal优先级(这消除了使用ThreadPool的可能性)。

  • 线程必须在完成任务后向主线程提供反馈。

  • 线程将监视Queue<T>以获取要执行的更多作业。

我正在使用.NET Framework 4.0。

请让我知道您的想法 :)

6个回答

4

当我需要实现自己的多线程处理时,我通常会使用类似以下的方法:

public class MyWorker<T> : IDisposable
{
    private readonly Queue<T> _taskQueue; // task queue
    private readonly object _threadLock = new object();
    private Thread _thread; // worker thread
    private ManualResetEvent _evExit;
    private AutoResetEvent _evNewData;

    /// <summary>Override this to process data.</summary>
    protected abstract void ProcessData(T data);

    /// <summary>Override this to set other thread priority.</summary>
    protected virtual ThreadPriority ThreadPriority
    {
        get { return ThreadPriority.BelowNormal; }
    }

    protected MyWorker()
    {
        _taskQueue = new Queue<T>();
        _evExit = new ManualResetEvent(false);
        _evNewData = new AutoResetEvent(false);
    }

    ~MyWorker()
    {
        Dispose(false);
    }

    private void ThreadProc()
    {
        try
        {
            var wh = new WaitHandle[] { _evExit, _evNewData };
            while(true)
            {
                T data = default(T);
                bool gotData = false;
                lock(_taskQueue) // sync
                {
                    if(_taskQueue.Count != 0) // have data?
                    {
                        data = _taskQueue.Dequeue();
                        gotData = true;
                    }
                }
                if(!gotData)
                {
                    if(WaitHandle.WaitAny(wh) == 0) return; // demanded stop
                    continue; //we have data now, grab it
                }
                ProcessData(data);
                if(_evExit.WaitOne(0)) return;
            }
        }
        catch(ThreadInterruptedException)
        {
            // log warning - this is not normal
        }
        catch(ThreadAbortException)
        {
            // log warning - this is not normal
        }
    }

    public void Start()
    {
        lock(_threadLock)
        {
            if(_thread != null)
                throw new InvalidOperationException("Already running.");
            _thread = new Thread(ThreadProc)
            {
                Name = "Worker Thread",
                IsBackground = true,
                Priority = ThreadPriority,
            };
            _thread.Start();
        }
    }

    public void Stop()
    {
        lock(_threadLock)
        {
            if(_thread == null)
                throw new InvalidOperationException("Is not running.");
            _evExit.Set();
            if(!_thread.Join(1000))
                _thread.Abort();
            _thread = null;
        }
    }

    /// <summary>Enqueue data for processing.</summary>
    public void EnqueueData(T data)
    {
        lock(_taskQueue)
        {
            _taskQueue.Enqueue(data);
            _evNewData.Set(); // wake thread if it is sleeping
        }
    }

    /// <summary>Clear all pending data processing requests.</summary>
    public void ClearData()
    {
        lock(_taskQueue)
        {
            _taskQueue.Clear();
            _evNewData.Reset();
        }
    }

    protected virtual void Dispose(bool disposing)
    {
        lock(_threadLock)
        {
            if(_thread != null)
            {
                _evExit.Set();
                if(!_thread.Join(1000))
                    _thread.Abort();
                _thread = null;
            }
        }
        _evExit.Close();
        _evNewData.Close();
        if(disposing)
            _taskQueue.Clear();
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
}

3
  • 线程必须具有BelowNormal优先级(这消除了使用ThreadPool的可能性)。

这似乎是使用TPL和ThreadPool的主要难点。您确定您没有高估较低优先级的有用性吗?

您将不得不投入大量的工作才能想出一些始终比TPL弱得多(且测试/可靠性都较差)的东西。

建议您重新考虑此事。


1

个人而言,我通常会自己编写代码,因为我喜欢更严格的控制。

我在媒体浏览器中使用这个:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
using MediaBrowser.Library.Logging;

namespace MediaBrowser.Library.Threading {

    public static class Async {

        public const string STARTUP_QUEUE = "Startup Queue";

        class ThreadPool {
            List<Action> actions = new List<Action>();
            List<Thread> threads = new List<Thread>();
            string name;
            volatile int maxThreads = 1;

            public ThreadPool(string name) {
                Debug.Assert(name != null);
                if (name == null) {
                    throw new ArgumentException("name should not be null");
                }
                this.name = name;
            }


            public void SetMaxThreads(int maxThreads) {
                Debug.Assert(maxThreads > 0);
                if (maxThreads < 1) {
                    throw new ArgumentException("maxThreads should be larger than 0");
                }

                this.maxThreads = maxThreads;
            }

            public void Queue(Action action, bool urgent) {
                Queue(action, urgent, 0);
            }

            public void Queue(Action action, bool urgent, int delay) {

                if (delay > 0) {
                    Timer t = null;
                    t = new Timer(_ =>
                    {
                        Queue(action, urgent, 0);
                        t.Dispose();
                    }, null, delay, Timeout.Infinite);
                    return;
                }

                lock (threads) {
                    // we are spinning up too many threads
                    // should be fixed 
                    if (maxThreads > threads.Count) {
                        Thread t = new Thread(new ThreadStart(ThreadProc));
                        t.IsBackground = true;
                        // dont affect the UI.
                        t.Priority = ThreadPriority.Lowest;
                        t.Name = "Worker thread for " + name;
                        t.Start();
                        threads.Add(t);
                    }
                }

                lock (actions) {
                    if (urgent) {
                        actions.Insert(0, action);
                    } else {
                        actions.Add(action);
                    }

                    Monitor.Pulse(actions);
                }
            }

            private void ThreadProc() {

                while (true) {

                    lock (threads) {
                        if (maxThreads < threads.Count) {
                            threads.Remove(Thread.CurrentThread);
                            break;
                        }
                    }

                    List<Action> copy;

                    lock (actions) {
                        while (actions.Count == 0) {
                            Monitor.Wait(actions);
                        }
                        copy = new List<Action>(actions);
                        actions.Clear();
                    }

                    foreach (var action in copy) {
                        action();
                    }
                }
            }
        }


        static Dictionary<string, ThreadPool> threadPool = new Dictionary<string, ThreadPool>();

        public static Timer Every(int milliseconds, Action action) {
            Timer timer = new Timer(_ => action(), null, 0, milliseconds);
            return timer;
        }

        public static void SetMaxThreads(string uniqueId, int threads) {
            GetThreadPool(uniqueId).SetMaxThreads(threads);
        }

        public static void Queue(string uniqueId, Action action) {
            Queue(uniqueId, action, null);
        }

        public static void Queue(string uniqueId, Action action, int delay) {
            Queue(uniqueId, action, null,false, delay);
        }

        public static void Queue(string uniqueId, Action action, Action done) {
            Queue(uniqueId, action, done, false);
        }

        public static void Queue(string uniqueId, Action action, Action done, bool urgent) {
            Queue(uniqueId, action, done, urgent, 0);
        }

        public static void Queue(string uniqueId, Action action, Action done, bool urgent, int delay) {

            Debug.Assert(uniqueId != null);
            Debug.Assert(action != null);

            Action workItem = () =>
            {
                try {
                    action();
                } catch (ThreadAbortException) { /* dont report on this, its normal */ } catch (Exception ex) {
                    Debug.Assert(false, "Async thread crashed! This must be fixed. " + ex.ToString());
                    Logger.ReportException("Async thread crashed! This must be fixed. ", ex);
                }
                if (done != null) done();
            };

            GetThreadPool(uniqueId).Queue(workItem, urgent, delay);
        }

        private static ThreadPool GetThreadPool(string uniqueId) {
            ThreadPool currentPool;
            lock (threadPool) {
                if (!threadPool.TryGetValue(uniqueId, out currentPool)) {
                    currentPool = new ThreadPool(uniqueId);
                    threadPool[uniqueId] = currentPool;
                }
            }
            return currentPool;
        }
    }

}

它具有相当优雅的API,唯一我希望有一天添加的功能是回收空线程池。

使用方法:

 // Set the threads for custom thread pool 
 Async.SetMaxThreads("Queue Name", 10); 
 // Perform an action on the custom threadpool named: "Queue Name", when done call ImDone  
 Async.Queue("Queue Name", () => DoSomeThing(foo), () => ImDone(foo)); 

这个程序有一些方便的重载函数,可以让你排队延迟操作,还有一个可以插入紧急任务并跳到队列前面的函数。


1
这种情况非常适合使用 BlockingCollection。创建一个专用线程,以适当的优先级监视队列。当队列中没有项目时,BlockingCollection.Take 方法将自动阻塞。
public class Example
{
  private BlockingCollection<WorkItem> m_Queue = new BlockingCollection<WorkItem>();

  public event EventHandler<WorkItemEventArgs> WorkItemCompleted;

  public Example()
  {
    var thread = new Thread(
      () =>
      {
        while (true)
        {
          WorkItem item = m_Queue.Take();
          // Add code to process the work item here.
          if (WorkItemCompleted != null)
          {
             WorkItemCompleted(this, new WorkItemEventArgs(item));
          }
        }
      });
    thread.IsBackground = true;
    thread.Priority = ThreadPriority.BelowNormal;
    thread.Start();
  }

  public void Add(WorkItem item)
  {
    m_Queue.Add(item);
  }

}

1

通过阅读上述条件

一些问题

1- 是否有其他线程将作业填充到队列< T >中?

如果答案是肯定的,那么可以在此处使用生产者/消费者设计模式,我不了解 .net 4.0,但这种设计可以在 .net 3.5 中实现。

请参见此处的示例。


重要提示:此处链接的有关生产者/消费者模式的文章是不正确的。它包含一个非常微妙但危险的错误,可能会导致其活锁。Microsoft确实需要放弃这篇文章。 - Brian Gideon

0
一个线程池听起来像是个好东西。实际上,你可以通过设置进程优先级来改变.NET自带的线程池的优先级。将进程优先级降低一档,将UI提高一档,这样你就可以让UI处于正常优先级,而线程池则处于较低优先级。

我不建议更改进程优先级。此外,如果您将BELOW_NORMAL_PRIORITY_CLASS用于进程,则必须使用THREAD_PRIORITY_HIGHEST来为GUI线程设置相同的优先级,以获得NORMAL_PRIORITY_CLASS / THREAD_PRIORITY_NORMAL线程具有的相同优先级。请参见http://msdn.microsoft.com/en-us/library/ms685100(VS.85).aspx - Paul Groke

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接