线程安全队列 - 入队 / 出队

34

首先,我将解释一个简短的情景;

当某些设备发出信号时,会向队列添加一种名为 Alarm 的对象。在一定时间间隔内,会检查队列,并对队列中每个 Alarm 调用一个方法。

然而,我遇到的问题是,如果在遍历队列时添加了一个警报,则会抛出错误,提示您在使用它时队列已经更改。以下是一小段代码,用于展示我的队列,请假定不断向其中插入报警;

public class AlarmQueueManager
{
    public ConcurrentQueue<Alarm> alarmQueue = new ConcurrentQueue<Alarm>();
    System.Timers.Timer timer;

    public AlarmQueueManager()
    {
        timer = new System.Timers.Timer(1000);
        timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
        timer.Enabled = true;
    }

    void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
    {
        DeQueueAlarm();
    }

    private void DeQueueAlarm()
    {
        try
        {
            foreach (Alarm alarm in alarmQueue)
            {
                SendAlarm(alarm);
                alarmQueue.TryDequeue();
                //having some trouble here with TryDequeue..

            }
        }
        catch
        {
        }
    }
所以我的问题是,我该如何使其更加...线程安全?这样我就不会遇到这些问题了。也许可以采用将队列复制到另一个队列中,对其进行操作,然后从原始队列中出队已处理的警报的方法?

编辑:刚被告知有并发队列,现在来查看一下


你应该先从队列中弹出项目,其次才是发送警报,同时使用通用的线程安全队列实现。如果无法处理某个项目,则将其重新加入队列。 - cdleonard
5个回答

36
private void DeQueueAlarm()
{
    Alarm alarm;
    while (alarmQueue.TryDequeue(out alarm))
        SendAlarm(alarm);
}

或者,您可以使用:

private void DeQueueAlarm()
{
    foreach (Alarm alarm in alarmQueue)
        SendAlarm(alarm);
}

根据 MSDN 上关于 ConcurrentQueue<T>.GetEnumerator 的文章:
枚举表示队列内容的瞬时快照。它不反映在调用 GetEnumerator 之后对集合所做的任何更新。枚举器可与从队列读取和写入并发使用。
因此,当多个线程同时调用您的 DeQueueAlarm 方法时,两种方法之间的区别就出现了。使用 TryQueue 方法,您可以确保队列中的每个警报仅被处理一次;然而,哪个线程选择哪个警报是非确定性的。使用 foreach 方法可确保每个竞争线程将处理队列中的所有警报(截至它开始迭代它们的时间点),导致同一警报被多次处理。
如果要确保每个警报仅被处理一次,并随后从队列中删除它,则应使用第一种方法。

所以仅供参考,TryDequeue会推出一个警报,我们将其填充到之前声明的警报变量中,然后使用它? - Kestami
没错。此外,当列表为空时,TryDequeue会返回false,这会导致我们退出while循环。 - Douglas
这很棒:)现在要测试一下。 - Kestami
第一种方法非常有效,正是我想要的。我猜这就是那种东西,一旦你知道了它,你就知道了!谢谢 - Kestami

33

啊,我不知道这个!会去看看。 - Kestami

10

我在使用“.TryDequeue()”时遇到了一些问题。你能帮我解决一下吗? :) 我会将我的问题更新为ConcurrentQueue。 - Kestami

0
更好的方法是考虑到每个线程实际上只处理一个警报,可以用以下方式替代:
        foreach (Alarm alarm in alarmQueue)
        {
            SendAlarm(alarm);
            alarmQueue.TryDequeue();
            //having some trouble here with TryDequeue..
        }

使用这个:

        while (!alarmQueue.IsEmpty)
        {
            Alarm alarm;
            if (!alarmQueue.TryDequeue(out alarm))  continue;
            SendAlarm(alarm);
        }

没有必要在任何时候获取队列的完整快照,因为你只关心每个周期开始时要处理的下一个。


-1
如果你想在不同的线程中使用队列,那么必须对队列进行加锁。就像下面的例子一样:
public object LockQ = new object()
public Queue<Alarm> alarmQueue = new Queue<Alarm>();

线程1: 入队()

while (true)
{
    lock (LockQ)
    {
        alarmQueue.Enqueue(alarm);
    }
}

thread2: 出队()

while (alarmQueue.Count > 0)
{
    lock (LockQ)
    {
        var _Alarm = alarmQueue.Dequeue();
    }
}

但是在不同的线程中使用队列的最优解决方案是使用 ConcurrentQueue

public ConcurrentQueue<Alarm> alarmQueue = new ConcurrentQueue<Alarm>();

线程1: 入队()

while (true)
{
    alarmQueue.Enqueue(alarm);
}

thread2: Dequeue()

while (alarmQueue.TryDequeue(out _Alarm))
{
    SendAlarm(_Alarm);
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接