.NET的线程安全缓冲区

4
(注意:虽然我希望了解.Net 4.0的未来想法,但在这个项目中我只能使用.Net 3.5。)
我有一个线程,它正在异步地从外部设备读取数据(通过创造性的“strSomeData”在代码示例中模拟),并将其存储在StringBuilder“buffer”(strBuilderBuffer)中。
在“主代码”中,我想要“啃食”这个“缓冲区”。但是,从“操作”角度来看,我不确定如何以线程安全的方式执行此操作。我知道从“数据”角度来看是安全的,因为根据msdn的说法:“此( StringBuilder )类型的任何公共静态成员都是线程安全的。任何实例成员都不能保证线程安全。”但是,我的下面的代码说明了从“操作”角度来看,它可能不是线程安全的。
关键是我担心代码的两行:
string strCurrentBuffer = ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.ToString();
// Thread 'randomly' slept due to 'inconvenient' comp resource scheduling...
ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.Length = 0;

如果计算机操作系统在读取缓冲区和清除缓冲区之间让我的线程休眠,我可能会丢失数据(这很糟糕 :-(
有什么方法可以保证这两行的原子性并强制计算机不中断它们吗?
关于Vlad下面关于使用“lock”的建议,我尝试过了,但它根本没用:
    public void BufferAnalyze()
    {
        String strCurrentBuffer;
        lock (ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer)
        {
            strCurrentBuffer = ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.ToString();
            Console.WriteLine("[BufferAnalyze()]  ||<<  Thread 'Randomly' Slept due to comp resource scheduling");
            Thread.Sleep(1000);  //  Simulate poor timing of thread resourcing...
            ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.Length = 0;
        }
        Console.WriteLine("[BufferAnalyze()]\r\nstrCurrentBuffer[{0}] == {1}", strCurrentBuffer.Length.ToString(), strCurrentBuffer);
    }

有更好的实现线程安全缓冲区的方法吗?

这是完整的代码:

namespace ExploringThreads
{
    /// <summary>
    /// Description of BasicThreads_TestThreadSafety_v1a
    /// </summary>
    class ThreadWorker_TestThreadSafety_v1a
    {
        private Thread thread;
        public static StringBuilder strBuilderBuffer = new StringBuilder("", 7500);
        public static StringBuilder strBuilderLog = new StringBuilder("", 7500);

        public bool IsAlive
        {
            get { return thread.IsAlive; }
        }

        public ThreadWorker_TestThreadSafety_v1a(string strThreadName)
        {
            // It is possible to have a thread begin execution as soon as it is created.
            // In the case of MyThread this is done by instantiating a Thread object inside MyThread's constructor.
            thread = new Thread(new ThreadStart(this.threadRunMethod));
            thread.Name = strThreadName;
            thread.Start();
        }

        public ThreadWorker_TestThreadSafety_v1a() : this("")
        {
            //   NOTE:  constructor overloading ^|^
        }

        //Entry point of thread.
        public void threadRunMethod()
        {
            Console.WriteLine("[ThreadWorker_TestThreadSafety_v1a threadRunMethod()]");
            Console.WriteLine(thread.Name + " starting.");
            int intSomeCounter = 0;
            string strSomeData = "";
            do
            {
                Console.WriteLine("[ThreadWorker_TestThreadSafety_v1a threadRunMethod()] running.");
                intSomeCounter++;
                strSomeData = "abcdef" + intSomeCounter.ToString() + "|||";
                strBuilderBuffer.Append(strSomeData);
                strBuilderLog.Append(strSomeData);
                Thread.Sleep(200);
            } while(intSomeCounter < 15);

            Console.WriteLine(thread.Name + " terminating.");
        }
    }
    /// <summary>
    /// Description of BasicThreads_TestThreadSafety_v1a.
    /// </summary>
    public class BasicThreads_TestThreadSafety_v1a
    {
        public BasicThreads_TestThreadSafety_v1a()
        {
        }

        public void BufferAnalyze()
        {
            string strCurrentBuffer = ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.ToString();
            Console.WriteLine("[BufferAnalyze()]  ||<<  Thread 'Randomly' Slept due to comp resource scheduling");
            Thread.Sleep(1000);  //  Simulate poor timing of thread resourcing...
            ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.Length = 0;
            Console.WriteLine("[BufferAnalyze()]\r\nstrCurrentBuffer[{0}] == {1}", strCurrentBuffer.Length.ToString(), strCurrentBuffer);
        }

        public void TestBasicThreads_TestThreadSafety_v1a()
        {
            Console.Write("Starting TestBasicThreads_TestThreadSafety_v1a  >>>  Press any key to continue . . . ");
            Console.Read();

            // First, construct a MyThread object.
            ThreadWorker_TestThreadSafety_v1a threadWorker_TestThreadSafety_v1a = new ThreadWorker_TestThreadSafety_v1a("threadWorker_TestThreadSafety_v1a Child");

            do
            {
                Console.WriteLine("[TestBasicThreads_TestThreadSafety_v1a()]");
                Thread.Sleep(750);
                BufferAnalyze();
                //} while (ThreadWorker_TestThreadSafety_v1a.thread.IsAlive);
            } while (threadWorker_TestThreadSafety_v1a.IsAlive);
            BufferAnalyze();
            Thread.Sleep(1250);
            Console.WriteLine("[TestBasicThreads_TestThreadSafety_v1a()]");
            Console.WriteLine("ThreadWorker_TestThreadSafety_v1a.strBuilderLog[{0}] == {1}", ThreadWorker_TestThreadSafety_v1a.strBuilderLog.Length.ToString(), ThreadWorker_TestThreadSafety_v1a.strBuilderLog);

            Console.Write("Completed TestBasicThreads_TestThreadSafety_v1a  >>>  Press any key to continue . . . ");
            Console.Read();
        }
    }
}

1
你使用的是哪个版本的.NET?如果你正在使用.NET 4.0,我会考虑使用“BlockingCollection”或“ConcurrentQueue”。 - James Michael Hare
@JamesMichaelHare 目前是3.5版本... 不过我会在未来的项目中考虑这个问题,谢谢。 - George 2.0 Hope
好的,那就更新你的帖子,表明它只适用于3.5版本。 - James Michael Hare
你所担心的那两行代码甚至不在代码片段中? - vidstige
@vidstige,它们位于方法public void BufferAnalyze()中(但出于说明目的,它们不是连续的)。 - George 2.0 Hope
你考虑过使用管道(可能是匿名管道)而不是字符串缓冲区吗?在我看来,这样做会更加优雅地解决你的问题,而无需担心锁定。 - Eric Petroelje
3个回答

6

在这里下载3.5版本的响应式扩展库。也可以使用NuGet包进行安装。下载后,在您的项目中引用System.Threading.dll即可。

现在,您可以在.NET 3.5中使用所有新的并发集合,这与.NET 4.0中一样。对于您的情况,最好的集合是BlockingCollection。它基本上是一个缓冲区,允许线程像普通队列一样加入和取出项目。唯一不同的是取出操作会阻塞,直到有项目可用为止。

现在完全没有必要使用StringBuilder类。下面是我重构代码的方式。我尽量简短,以便更容易理解。

public class Example
{
  private BlockingCollection<string> buffer = new BlockingCollection<string>();

  public Example()
  {
    new Thread(ReadFromExternalDevice).Start();
    new Thread(BufferAnalyze).Start();
  }

  private void ReadFromExteneralDevice()
  {
    while (true)
    {
      string data = GetFromExternalDevice();
      buffer.Add(data);
      Thread.Sleep(200);
    }
  }

  private void BufferAnalyze()
  {
    while (true)
    {
      string data = buffer.Take(); // This blocks if nothing is in the queue.
      Console.WriteLine(data);
    }
  } 
}

以后需要用到与BlockingCollection基本相同功能的类,可以使用TPL Data Flow库中的BufferBlock<T>类。该类将在.NET 4.5中提供。


史蒂文之前提到过:"请注意,在许多情况下使用锁仍然优于ReaderWriterLockSlim",那么在使用BlockingCollection时是否需要考虑这一点? - George 2.0 Hope
我同意Steven的观点。读写锁在特定情况下非常有用,但在大多数情况下实际上会更慢。不过这并不重要。BlockingCollection在内部并不使用读写锁。但是,它高度调整以实现最大并发性,因为它使用了许多低锁策略。 - Brian Gideon

3

使用 StringBuffer 不是线程安全的,但你可以切换到 ConcurrentQueue<char>

如果需要其他数据结构,.NET 4 中有其他线程安全的集合,请参见http://msdn.microsoft.com/en-us/library/dd997305.aspx


编辑:在 .NET 3.5 中,同步原语较少。您可以将锁添加到 Queue<char> 周围以制作简单的解决方案,尽管它的效率低于 .NET 4 的 ConcurrentQueue。或者再次使用相同的 StringBuffer,并使用lock操作读写操作:

public static StringBuilder strBuilderBuffer = new StringBuilder("", 7500);
private object BufferLock = new object();

...

lock (BufferLock)
    strBuilderBuffer.Append(strSomeData);

...

string strCurrentBuffer;
lock (BufferLock)
{
    strCurrentBuffer = ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.ToString();
    ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.Clear();
}
Console.WriteLine("[BufferAnalyze()]  ||<<  Thread 'Randomly' Slept ...");
Thread.Sleep(1000);  //  Simulate poor timing of thread resourcing...

编辑:

你不能保证操作系统不会挂起持有锁的工作线程。然而,锁可以保证只要一个线程正在处理缓冲区,其他线程将无法干扰和更改缓冲区。

这就是为什么持有锁的时间应该尽可能短的原因:

  • 获取锁,添加数据,释放锁 - 或 -
  • 获取锁,复制数据,清空缓冲区,释放锁,开始处理已复制的数据。

不幸的是,这个平台受限于3.5版本... 不过我会记住这一点,以备将来的项目。 - George 2.0 Hope
@Geo:那么你或许应该使用一个简单的“锁”。 - Vlad
你上面展示的对我来说没有意义,与我所尝试做的事情不符合...然而,我已经在上面的问题中添加了我想要做的事情。 - George 2.0 Hope
我不确定我理解BufferLock的作用,或者为什么它要放在strBuilderBuffer.Append(strSomeData);语句之前。在public void BufferAnalyze()中,我需要保证缓冲区的原子性,在读取和清除缓冲区之间(就像我上面重新编辑问题时尝试做的那样)。如果我表达不清楚,请见谅。 - George 2.0 Hope
@Geo:你必须在锁定状态下清空缓冲区,释放锁定,然后才能继续处理。 - Vlad

1

谢谢,我现在正在研究 'ReaderWriterLockSlim'(.Net 3.5+ :-))。 - George 2.0 Hope
1
ReaderWriterLock并不比简单锁更好,因为只有一个生产者和一个消费者。 - Vlad
2
请注意,在许多情况下,使用 lock 仍然优于 ReaderWriterLockSlim。因此,首先使用简单的 lock,只有在速度不够快时(没有过早优化)才进行调整。 - Steven
是的,我的测试表明,在简单场景下,普通锁大约比ReaderWriterLock快8倍,比较新的ReaderWriterLockSlim快2倍。在读者数量显著多于写者并且读操作足够长以克服额外开销的平衡点的情况下,RWL在速度上更快。 - Brian Gideon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接