我有一个 MessagesManager
线程,不同的线程可能会向其发送消息,然后这个 MessagesManager
线程负责在 SendMessageToTcpIP()
(MessagesManager
线程的起始点)中发布这些消息。
class MessagesManager : IMessageNotifier
{
//private
private readonly AutoResetEvent _waitTillMessageQueueEmptyARE = new AutoResetEvent(false);
private ConcurrentQueue<string> MessagesQueue = new ConcurrentQueue<string>();
public void PublishMessage(string Message)
{
MessagesQueue.Enqueue(Message);
_waitTillMessageQueueEmptyARE.Set();
}
public void SendMessageToTcpIP()
{
//keep waiting till a new message comes
while (MessagesQueue.Count() == 0)
{
_waitTillMessageQueueEmptyARE.WaitOne();
}
//Copy the Concurrent Queue into a local queue - keep dequeuing the item once it is inserts into the local Queue
Queue<string> localMessagesQueue = new Queue<string>();
while (!MessagesQueue.IsEmpty)
{
string message;
bool isRemoved = MessagesQueue.TryDequeue(out message);
if (isRemoved)
localMessagesQueue.Enqueue(message);
}
//Use the Local Queue for further processing
while (localMessagesQueue.Count() != 0)
{
TcpIpMessageSenderClient.ConnectAndSendMessage(localMessagesQueue.Dequeue().PadRight(80, ' '));
Thread.Sleep(2000);
}
}
}
不同的线程(3-4个)通过调用
PublishMessage(string Message)
(使用相同的对象到MessageManager)来发送它们的消息。一旦消息收到,我将该消息推送到一个并发队列中,并通过设置_waitTillMessageQueueEmptyARE.Set();
通知SendMessageToTcpIP()
。在SendMessageToTcpIP()
内部,我从并发队列中复制消息到本地队列中,然后逐个发布消息。问题:以这种方式进行入队和出队是否是线程安全的?这样做是否会导致一些奇怪的影响?
GetConsumingEnumerable()
。那段代码只会引入线程安全问题。 - Panagiotis Kanavospublic void SendMessageToTcpIP(){ foreach(var message in MessageQueue.GetConsumingEnumerable(){TcpIpMessageSenderClient.ConnectAndSendMessage(message.PadRight(80, ' ');}}
就足够了。 - Panagiotis KanavosMain()
函数启动了MessageManager
线程,然后在其中调用SendMessageToTcpIP()
函数进行循环。因此,一旦完成任务,它将被重新调用并进入等待状态,直到有新的消息到来。 - skm