创建线程时出现了NullReferenceException异常

5

我在查看有关创建简单线程池的 线程 时,发现了 @MilanGardian 的 .NET 3.5 响应,它非常优雅并且符合我的目的:

using System;
using System.Collections.Generic;
using System.Threading;

namespace SimpleThreadPool
{
    public sealed class Pool : IDisposable
    {
        public Pool(int size)
        {
            this._workers = new LinkedList<Thread>();
            for (var i = 0; i < size; ++i)
            {
                var worker = new Thread(this.Worker) { Name = string.Concat("Worker ", i) };
                worker.Start();
                this._workers.AddLast(worker);
            }
        }

        public void Dispose()
        {
            var waitForThreads = false;
            lock (this._tasks)
            {
                if (!this._disposed)
                {
                    GC.SuppressFinalize(this);

                    this._disallowAdd = true; // wait for all tasks to finish processing while not allowing any more new tasks
                    while (this._tasks.Count > 0)
                    {
                        Monitor.Wait(this._tasks);
                    }

                    this._disposed = true;
                    Monitor.PulseAll(this._tasks); // wake all workers (none of them will be active at this point; disposed flag will cause then to finish so that we can join them)
                    waitForThreads = true;
                }
            }
            if (waitForThreads)
            {
                foreach (var worker in this._workers)
                {
                    worker.Join();
                }
            }
        }

        public void QueueTask(Action task)
        {
            lock (this._tasks)
            {
                if (this._disallowAdd) { throw new InvalidOperationException("This Pool instance is in the process of being disposed, can't add anymore"); }
                if (this._disposed) { throw new ObjectDisposedException("This Pool instance has already been disposed"); }
                this._tasks.AddLast(task);
                Monitor.PulseAll(this._tasks); // pulse because tasks count changed
            }
        }

        private void Worker()
        {
            Action task = null;
            while (true) // loop until threadpool is disposed
            {
                lock (this._tasks) // finding a task needs to be atomic
                {
                    while (true) // wait for our turn in _workers queue and an available task
                    {
                        if (this._disposed)
                        {
                            return;
                        }
                        if (null != this._workers.First && object.ReferenceEquals(Thread.CurrentThread, this._workers.First.Value) && this._tasks.Count > 0) // we can only claim a task if its our turn (this worker thread is the first entry in _worker queue) and there is a task available
                        {
                            task = this._tasks.First.Value;
                            this._tasks.RemoveFirst();
                            this._workers.RemoveFirst();
                            Monitor.PulseAll(this._tasks); // pulse because current (First) worker changed (so that next available sleeping worker will pick up its task)
                            break; // we found a task to process, break out from the above 'while (true)' loop
                        }
                        Monitor.Wait(this._tasks); // go to sleep, either not our turn or no task to process
                    }
                }

                task(); // process the found task
                this._workers.AddLast(Thread.CurrentThread);
                task = null;
            }
        }

        private readonly LinkedList<Thread> _workers; // queue of worker threads ready to process actions
        private readonly LinkedList<Action> _tasks = new LinkedList<Action>(); // actions to be processed by worker threads
        private bool _disallowAdd; // set to true when disposing queue but there are still tasks pending
        private bool _disposed; // set to true when disposing queue and no more tasks are pending
    }


    public static class Program
    {
        static void Main()
        {
            using (var pool = new Pool(5))
            {
                var random = new Random();
                Action<int> randomizer = (index =>
                {
                    Console.WriteLine("{0}: Working on index {1}", Thread.CurrentThread.Name, index);
                    Thread.Sleep(random.Next(20, 400));
                    Console.WriteLine("{0}: Ending {1}", Thread.CurrentThread.Name, index);
                });

                for (var i = 0; i < 40; ++i)
                {
                    var i1 = i;
                    pool.QueueTask(() => randomizer(i1));
                }
            }
        }
    }
}

我是这样使用的:

static void Main(string[] args)
{
   ...
   ...
      while(keepRunning)
      {
         ...
         pool.QueueTask(() => DoTask(eventObject);
      }
   ...
}

private static void DoTask(EventObject e)
{
   // Do some computations

   pool.QueueTask(() => DoAnotherTask(eventObject)); // this is a relatively smaller computation
}

我运行代码大约两天后出现以下异常:

Unhandled Exception: System.NullReferenceException: Object reference not set to an instance of an object.
   at System.Collections.Generic.LinkedList`1.InternalInsertNodeBefore(LinkedListNode`1 node, LinkedListNode`1 newNode)
   at System.Collections.Generic.LinkedList`1.AddLast(T value)
   at MyProg.Pool.Worker()
   at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.ThreadHelper.ThreadStart()

我无法确定是什么原因导致了这个问题,因为我无法再次复现这个错误。有没有任何建议来解决这个问题?


你的堆栈跟踪指向 this._workers.AddLast(Thread.CurrentThread); 作为罪犯。我在C#中没有太多使用LinkedLists,但也许它在顺序上被绊倒了,不是线程安全的。 - TyCobb
1
几乎所有的“NullReferenceException”异常情况都是相同的。请参考“.NET中的NullReferenceException是什么?”获取一些提示。 - John Saunders
我认为你遇到了NULLReferenceException错误,因为你还没有初始化或正确创建对象。尝试使用new创建对象 - 这可能会有所帮助。 - Shumail
2个回答

3

我觉得我找到了问题所在。这段代码样例中漏掉了一个lock()

private void Worker()
{
    Action task = null;
    while (true) // loop until threadpool is disposed
    {
        lock (this._tasks) // finding a task needs to be atomic
        {
            while (true) // wait for our turn in _workers queue and an available task
            {
            ....
            }
        }

        task(); // process the found task
        this._workers.AddLast(Thread.CurrentThread);
        task = null;
    }
}

锁定应该在this._workers.AddLast(Thread.CurrentThread);周围进行扩展或包装。

如果您查看修改LinkedList的其他代码(Pool.QueueTask),则被包装在lock中。


+1 谢谢。这似乎解决了问题。如果我再次观察到这个问题,我会更新的。 - Legend

3

看起来对 _workers 链表的访问没有得到适当的同步。考虑以下情况:

假设在某个时刻,this._workets 列表包含一个元素。

第一个线程调用 this._workers.AddLast(Thread.CurrentThread); 但在 AddLast() 方法内部的一个非常特殊的地方被中断:

public void AddLast(LinkedListNode<T> node)
{
    this.ValidateNewNode(node);
    if (this.head == null)
    {
        this.InternalInsertNodeToEmptyList(node);
    }
    else
    {
        // here we got interrupted - the list was not empty,
        // but it would be pretty soon, and this.head becomes null
        // InternalInsertNodeBefore() does not expect that
        this.InternalInsertNodeBefore(this.head, node);
    }
    node.list = (LinkedList<T>) this;
}

其他线程调用了this._workers.RemoveFirst();。该语句周围没有lock(),因此它完成后列表为空。现在,AddLast()应该调用InternalInsertNodeToEmptyList(node);,但由于条件已经被评估过了,所以无法这样做。

在单个this._workers.AddLast()行周围放置一个简单的lock(this._tasks)可以防止这种情况发生。

其他糟糕的情况包括两个线程同时向同一列表添加项目。


+1 感谢您的时间。将语句包装在锁内解决了问题。 - Legend

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接