C#并发队列中的线程安全性

6

我有一个 MessagesManager 线程,不同的线程可能会向其发送消息,然后这个 MessagesManager 线程负责在 SendMessageToTcpIP()MessagesManager 线程的起始点)中发布这些消息。

class MessagesManager : IMessageNotifier
{
    //private
    private readonly AutoResetEvent _waitTillMessageQueueEmptyARE = new AutoResetEvent(false);
    private ConcurrentQueue<string> MessagesQueue = new ConcurrentQueue<string>(); 

    public void PublishMessage(string Message)
    {
        MessagesQueue.Enqueue(Message);
        _waitTillMessageQueueEmptyARE.Set();
    }

    public void SendMessageToTcpIP()
    {
        //keep waiting till a new message comes
        while (MessagesQueue.Count() == 0)
        {
            _waitTillMessageQueueEmptyARE.WaitOne();
        }

        //Copy the Concurrent Queue into a local queue - keep dequeuing the item once it is inserts into the local Queue
        Queue<string> localMessagesQueue = new Queue<string>();

        while (!MessagesQueue.IsEmpty)
        {
            string message;
            bool isRemoved = MessagesQueue.TryDequeue(out message);
            if (isRemoved)
                localMessagesQueue.Enqueue(message);
        }

        //Use the Local Queue for further processing
        while (localMessagesQueue.Count() != 0)
        {
            TcpIpMessageSenderClient.ConnectAndSendMessage(localMessagesQueue.Dequeue().PadRight(80, ' '));
            Thread.Sleep(2000);
        }
    }
}

不同的线程(3-4个)通过调用PublishMessage(string Message)(使用相同的对象到MessageManager)来发送它们的消息。一旦消息收到,我将该消息推送到一个并发队列中,并通过设置_waitTillMessageQueueEmptyARE.Set();通知SendMessageToTcpIP()。在SendMessageToTcpIP()内部,我从并发队列中复制消息到本地队列中,然后逐个发布消息。
问题:以这种方式进行入队和出队是否是线程安全的?这样做是否会导致一些奇怪的影响?

你为什么要检查计数或使用AutoResetEvent?为什么要使用本地队列?ConcurrentQueue是线程安全的,不需要任何代码。如果你想遍历现有的消息,可以使用GetConsumingEnumerable()。那段代码只会引入线程安全问题。 - Panagiotis Kanavos
1
由于您的消息(类型为“字符串”)是不可变的,因此这应该是安全的。但是,如果您的消息是可变的,则即使队列本身是线程安全的,您的使用也不一定是线程安全的。例如,如果某个其他线程在不同的线程正在使用它时改变了消息。 - Matthew Watson
1
即使您不使用它,@skm也会起作用。取消排队消息是一项阻塞操作。一个简单的 public void SendMessageToTcpIP(){ foreach(var message in MessageQueue.GetConsumingEnumerable(){TcpIpMessageSenderClient.ConnectAndSendMessage(message.PadRight(80, ' ');}} 就足够了。 - Panagiotis Kanavos
@Evk:Main()函数启动了MessageManager线程,然后在其中调用SendMessageToTcpIP()函数进行循环。因此,一旦完成任务,它将被重新调用并进入等待状态,直到有新的消息到来。 - skm
请提供一个使用该类的示例。 - aboersch
显示剩余7条评论
4个回答

4

虽然这可能是线程安全的,但.NET内置了许多用于帮助“多发布者一个消费者”模式的类,例如BlockingCollection。您可以将您的类改写为以下形式:

class MessagesManager : IDisposable {
    // note that your ConcurrentQueue is still in play, passed to constructor
    private readonly BlockingCollection<string> MessagesQueue = new BlockingCollection<string>(new ConcurrentQueue<string>());

    public MessagesManager() {
        // start consumer thread here
        new Thread(SendLoop) {
            IsBackground = true
        }.Start();
    }

    public void PublishMessage(string Message) {
        // no need to notify here, will be done for you
        MessagesQueue.Add(Message);
    }

    private void SendLoop() {
        // this blocks until new items are available
        foreach (var item in MessagesQueue.GetConsumingEnumerable()) {
            // ensure that you handle exceptions here, or whole thing will break on exception
            TcpIpMessageSenderClient.ConnectAndSendMessage(item.PadRight(80, ' '));
            Thread.Sleep(2000); // only if you are sure this is required 
        }
    }

    public void Dispose() {            
        // this will "complete" GetConsumingEnumerable, so your thread will complete
        MessagesQueue.CompleteAdding();
        MessagesQueue.Dispose();
    }
}

SendLoop()执行Thread.Sleep(2000);时,发布者是否能够将其消息推送到MessageQueue - skm
是的,出版商可以随时将消息推送到您的队列中。 - Evk

2

.NET已经提供了ActionBlock< T>,它允许将消息发布到缓冲区并异步处理它们。默认情况下,每次仅处理一个消息。

您的代码可以重写为:

//In an initialization function
ActionBlock<string> _hmiAgent=new ActionBlock<string>(async msg=>{
        TcpIpMessageSenderClient.ConnectAndSendMessage(msg.PadRight(80, ' '));
        await Task.Delay(2000);
);

//In some other thread ...
foreach ( ....)
{
    _hmiAgent.Post(someMessage);
}

// When the application closes

_hmiAgent.Complete();
await _hmiAgent.Completion;

ActionBlock提供了许多好处-您可以指定它可以接受的缓冲区项目数量的限制,并指定可以并行处理多个消息。您还可以在处理管道中组合多个块。在桌面应用程序中,可以响应事件将消息发布到管道中,由单独的块进行处理并将结果发布到更新UI的最终块中。
例如,填充可以由中间TransformBlock<TIn,TOut>执行。这种转换是微不足道的,使用该块的成本大于方法,但这只是一个说明。
//In an initialization function
TransformBlock<string> _hmiAgent=new TransformBlock<string,string>(
    msg=>msg.PadRight(80, ' '));

ActionBlock<string> _tcpBlock=new ActionBlock<string>(async msg=>{
        TcpIpMessageSenderClient.ConnectAndSendMessage());
        await Task.Delay(2000);
);

var linkOptions=new DataflowLinkOptions{PropagateCompletion = true};
_hmiAgent.LinkTo(_tcpBlock);

发布代码不会改变。
    _hmiAgent.Post(someMessage);

当应用程序终止时,我们需要等待_tcpBlock完成:
    _hmiAgent.Complete();
    await _tcpBlock.Completion;

Visual Studio 2015+本身使用TPL Dataflow来处理这些场景。

Bar Arnon在TPL Dataflow Is The Best Library You're Not Using中提供了更好的例子,展示了如何在块中同时使用同步和异步方法。


Post(someMessage) 发送的消息是如何被 ActionBlock 接收的? - skm
“Post”是ActionBlock和所有其他块的一种方法。 - Panagiotis Kanavos
@skm TPL Dataflow 提供了一个两步消息发送协议,用于链接块以确保传递。 - VMAtm

0

这是我重构后的代码片段,展示了我如何实现这个功能:

class MessagesManager {
    private readonly AutoResetEvent messagesAvailableSignal = new AutoResetEvent(false);
    private readonly ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();

    public void PublishMessage(string Message) {
        messageQueue.Enqueue(Message);
        messagesAvailableSignal.Set();
    }

    public void SendMessageToTcpIP() {
        while (true) {
            messagesAvailableSignal.WaitOne();
            while (!messageQueue.IsEmpty) {
                string message;
                if (messageQueue.TryDequeue(out message)) {
                    TcpIpMessageSenderClient.ConnectAndSendMessage(message.PadRight(80, ' '));
                }
            }
        }
    }
}

注意事项:

  1. 这将完全清空队列:如果至少有一条消息,它将处理所有消息
  2. 删除了2000毫秒的线程休眠

你根本不需要AutoResetEvent。你可以将这两个方法转换为每行一条语句。 - Panagiotis Kanavos
1
我认为信号是必要的:您希望消费者线程等待消息到达,而不是无休止地循环浪费 CPU 周期。 - Tamas Ionut
1
TryDequeue 不会阻塞 => 如果队列中没有元素,它将返回 false - Tamas Ionut
1
@PanagiotisKanavos,阻塞集合与并发集合不同。在这种情况下,信号似乎是避免过度自旋的逻辑方法。转换为阻塞集合可能是一个选项,也可能不是一个选项,或者您坚持认为在所有情况下都是正确的吗?例如,如果PublishMessage被大量写入者调用,其中阻塞(这就是BlockingCollection将要做的,对吧?)是不需要的呢? - Sinatr
@Sinatr 我并没有说它是并发的。我说不需要事件,因为没有自旋。GetConsumingEnumerable() 不会自旋,而是阻塞。使用 TryDequeue 才会引入自旋。Evk 已经发布了一个正确的答案,它既不会自旋也不需要事件。 - Panagiotis Kanavos
显示剩余7条评论

0

由于ConcurrentQueueAutoResetEvent都是线程安全的,所以此代码线程安全的。无论如何,您正在读取字符串而从未写入它们,因此此代码是线程安全的。

但是,您必须确保在某种循环中调用SendMessageToTcpIP
否则,您将面临危险的竞争条件-有些消息可能会丢失:

while (!MessagesQueue.IsEmpty)
        {
            string message;
            bool isRemoved = MessagesQueue.TryDequeue(out message);
            if (isRemoved)
                localMessagesQueue.Enqueue(message);
        }

        //<<--- what happens if another thread enqueues a message here?

        while (localMessagesQueue.Count() != 0)
        {
            TcpIpMessageSenderClient.ConnectAndSendMessage(localMessagesQueue.Dequeue().PadRight(80, ' '));
            Thread.Sleep(2000);
        }

除此之外,AutoResetEvent 是一个非常重的对象。它使用内核对象来同步线程。每次调用都是一个系统调用,可能会很昂贵。考虑使用用户模式同步对象(.net是否提供某种条件变量?)


该循环已经由GetConsumingEnumerable()提供。检查是否为空或检查计数是不必要的。 - Panagiotis Kanavos
@David:SendMessageToTcpIP()Main()的while循环中。如果有新消息,则会在下一次迭代(或下一次调用)SendMessageToTcpIP()时进行处理。 - skm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接