多个线程访问集合

3

我有如下情况。

我有一个队列,收到消息后创建新线程来处理它。

该线程将消息添加到集合中。然后检查该集合是否包含100个项目,如果是,则将它们发送到其他位置并清除该集合。

我无法使用常规列表,因为会出现“集合已修改,枚举无法继续”的错误。因此,我需要使用线程安全的集合。

但我的担心是,一个线程向其中写入了第100个项目,当它将其发送到其他位置时,另一个线程可能会向集合中添加一个项目。这样,集合就变成了101个项目,触发第100个项目的线程将其清除,我就失去了一个项目。

我不能使用 ConcurrentBag,因为它没有 clear,我也无法迭代该 bag 并逐个删除其中的项目,因为可能会不断地添加新的消息而无法结束。

ConcurrentStack 有一个 clear 方法,但在这种情况下是否有效呢?

以下是一些代码示例,HandleMeasurementMessage 在每条消息上都会在新线程中调用。

private static readonly ConcurrentStack<EventHubDatum> EventHubDataBatch = new ConcurrentStack<EventHubDatum>();

private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
    /* Do a bunch of stuff to msg */

   EventHubDataBatch.Push(eventHubDatum);

   if(EventHubDataBatch.Count == 100)
   {
      /* Send them off*/
      EventHubDatabatch.Clear();
   }
}

奇怪的是,只有在没有通过VS2015调试器运行时,枚举被修改的问题才会发生。程序正常运行了一个小时左右。如果我关闭调试器,就会出现这些枚举错误,这就是为什么我尝试切换到线程安全集合的原因。我只是不确定哪个集合适合。

调用HandleMeasurementMessage的代码

_busSingle.Consume<MessageEnvelope>(_queueMeasurement, (msg, MessageReceivedInfo) => Task.Factory.StartNew(() =>
            {
                try
                {
                    HandleMeasurementMessage(msg);
                }
                catch (Exception ex)
                {
                    /* Logging stuff*/
                }
            }));

有很多可用的同步机制... 你研究过.NET中的线程同步吗? - rory.ap
当一条消息到达时,将创建一个新线程来处理它。这是按照字面意思理解的吗?如果您收到100条消息,那么就会有100个线程吗?这些消息从哪里来? - Jeroen van Langen
@Jeroen Van Langen,是的。至少我是这么认为的。我将附上调用此方法的代码。它们来自队列/总线。 - Stuart
3
只需使用简单的“锁”,这里不需要并发集合。 - Evk
我认为 Task.Factory.StartNew() 是不必要的。为什么你想要多线程添加项目呢?这里没有涉及到任何重量级的计算。 - Jeroen van Langen
显示剩余5条评论
2个回答

3
我建议您使用这样的简单锁: ```

我建议您使用像这样的简单锁:

```
private static readonly List<EventHubDatum> EventHubDataBatch = new List<EventHubDatum>();        
private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
    /* Do a bunch of stuff to msg */

    EventHubDatum[] toSend = null;
    lock (EventHubDataBatch) {
        EventHubDataBatch.Add(eventHubDatum);

        if (EventHubDataBatch.Count == 100) {
            // copy to local
            toSend = EventHubDataBatch.ToArray();
            EventHubDataBatch.Clear();
        }
    }

    if (toSend != null) {
        /* Send them off*/
    }
}

在这里锁定操作很简短,不应该对你的性能产生明显影响。请注意,如果有100个项目 - 我们将它们复制到本地数组中,并清除源列表,以便在“发送它们”的操作期间不保持锁定状态,这可能需要很长时间。


天啊.... 两个灵魂,一个解决方案... 很不错.. 同样的解决方案用两次没有用处... +1 - Jeroen van Langen
是的,我已经将100个项目复制到另一个列表中,并将该列表发送到发送它们的方法中,因为它是异步的,可能会有点慢(潜在地)。 - Stuart
@Evk 这个模式有官方名称吗?我经常使用它,给它起个名字会很好。 - Jeroen van Langen
@JeroenvanLangen 我也不知道,对我来说它只是“有意义的”。而你提供了完全相同的解决方案,这一事实也证实了这一点 :) - Evk
@Evk 这对于处理网络消息或日志消息非常有用。是一个很好的解决方案,而且运行良好。ConcurrentQueue.Dispose() ;-) - Jeroen van Langen

0
使用同步对象,如AutoResetEvent,以便一次只允许一个线程访问集合。
示例用法:
static AutoResetEvent MeasureMessageEvent = new AutoResetEvent(true);

private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
    /* Do a bunch of stuff to msg */

    // Wait for exclusive access
    MeasureMessageEvent.WaitOne();

    EventHubDataBatch.Push(eventHubDatum);

    if(EventHubDataBatch.Count == 100)
    {
       /* Send them off*/
       EventHubDatabatch.Clear();
    }

    // Release exclusive access
    MeasureMessageEvent.Set();
}

这里EventHubdataBatch应该是哪种集合类型? - Stuart
集合类型不重要。 - Krzysztof Bracha
是的,如果您查看上面的代码,AutoResetEvent对象与集合类型没有任何关系。重要的是,在修改集合之前调用WaitOne()函数,并确保在调用Set()函数后释放访问权限。 - Krzysztof Bracha

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接