我有如下情况。
我有一个队列,收到消息后创建新线程来处理它。
该线程将消息添加到集合中。然后检查该集合是否包含100个项目,如果是,则将它们发送到其他位置并清除该集合。
我无法使用常规列表,因为会出现“集合已修改,枚举无法继续”的错误。因此,我需要使用线程安全的集合。
但我的担心是,一个线程向其中写入了第100个项目,当它将其发送到其他位置时,另一个线程可能会向集合中添加一个项目。这样,集合就变成了101个项目,触发第100个项目的线程将其清除,我就失去了一个项目。
我不能使用 ConcurrentBag,因为它没有 clear,我也无法迭代该 bag 并逐个删除其中的项目,因为可能会不断地添加新的消息而无法结束。
ConcurrentStack 有一个 clear 方法,但在这种情况下是否有效呢?
以下是一些代码示例,HandleMeasurementMessage 在每条消息上都会在新线程中调用。
private static readonly ConcurrentStack<EventHubDatum> EventHubDataBatch = new ConcurrentStack<EventHubDatum>();
private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
/* Do a bunch of stuff to msg */
EventHubDataBatch.Push(eventHubDatum);
if(EventHubDataBatch.Count == 100)
{
/* Send them off*/
EventHubDatabatch.Clear();
}
}
奇怪的是,只有在没有通过VS2015调试器运行时,枚举被修改的问题才会发生。程序正常运行了一个小时左右。如果我关闭调试器,就会出现这些枚举错误,这就是为什么我尝试切换到线程安全集合的原因。我只是不确定哪个集合适合。
调用HandleMeasurementMessage的代码
_busSingle.Consume<MessageEnvelope>(_queueMeasurement, (msg, MessageReceivedInfo) => Task.Factory.StartNew(() =>
{
try
{
HandleMeasurementMessage(msg);
}
catch (Exception ex)
{
/* Logging stuff*/
}
}));
Task.Factory.StartNew()
是不必要的。为什么你想要多线程添加项目呢?这里没有涉及到任何重量级的计算。 - Jeroen van Langen