并发队列性能差

12

我有两个线程。一个线程从串口收集数据,并将其放入数组中,然后添加到并发队列中。另一个线程会实时地绘制出这些数据。每秒钟大约有150个50字节的数据包。

问题在于,当程序运行时,它会消耗大约10%的CPU资源。这是在非常快的I7 Haswell核心上测试的结果。如果在消费线程中添加Thread.Sleep(1),则CPU利用率会降至1%。但是,如果将其设置为Thread.Sleep(2),那么数据包就无法同步了。因此,这不是一种解决方案,如果在较慢的计算机上运行,则可能无法正常工作。

代码很简单。以下是生产者线程:

    static void FillQueue(byte[] buffer)
    {
        dataQueue.Enqueue(buffer);
    }

这里是消费者线程:

        while (continuePolling)
        {                
            Thread.Sleep(1); //if removed, there is 10% CPU utilization.  If higher then packet synchronization is lost.

            if (dataQueue.TryDequeue(out result))
            {
                ProcessPacket(result);
            }               
        }

我觉得这很难理解,因为ProcessPacket方法需要大约0.3毫秒才能执行,每秒调用约10次。

现在我已经尝试使用阻塞集合。以下是生产者代码:

 BlockingCollection<byte[]> dataQueue = new BlockingCollection<byte[]>;

 public static void addData(buffer)
 {
      dataQueue.add(buffer);
 }

这是消费者

 while (dataQueue.TryTake(out result)
 {
     ProcessPacket(result);
 }

完全没有区别!它使用10%的CPU,如果我添加Thread.Sleep(1),它就可以正常工作,但是如果我添加Thread.Sleep(2),就会丢失数据包。 我不明白。每个数据包中有50个字节。就这样。并且它们被生产和消费的速度一样快。 谢谢


听起来你没有在串口上使用异步I/O。请展示一下你的I/O代码。 - user585968
考虑使用BlockingCollection而不是ConcurrentQueue,以避免需要使用阻塞的Take方法调用Thread.Sleep - mjwills
@MickyD串行I/O的时间不到一毫秒,而且该线程不是瓶颈。如果我注释掉消费者,那么性能问题就不存在了。 - Tom
1个回答

20
那是因为您基本上有一个while (true){}循环。每个ProcessPacket需要0.3毫秒,每秒有10个数据包,因此每秒有3毫秒的有用工作。其余的997毫秒中,您的循环不断检查队列中是否有新项目,浪费了CPU资源。
相反,应该和您的队列一起使用BlockingCollection,它对于您的任务有更好的选项。它支持阻塞出列(如果需要,还可以带超时和取消令牌),因此不会浪费CPU:
var dataQueue = new BlockingCollection<string>(new ConcurrentQueue<string>());
// Add instead of Enqueue
dataQueue.Add("some item");
// Take instead of Dequeue, this will block until item is available in queue
var result = dataQueue.Take();
// blocks until item is available or timeout happens
if (dataQueue.TryTake(out result, TimeSpan.FromMilliseconds(100)))

它还支持“流”接口,因此您的代码可以仅为:

foreach (var result in dataQueue.GetConsumingEnumerable()) {
    ProcessPacket(result);        
}

不要使用continuePolling标志,改为调用

dataQueue.CompleteAdding();

当不再期望提供更多项时(在这种情况下,您会将continuePolling设置为false),这将使GetConsumingEnumerable完成提供项目并返回。


嗨,我刚刚使用BlockingCollection更新了问题。我在那里也做错了什么吗?这是一个实时过程,并且将继续获取和处理数据包,直到应用程序关闭。因此,continuePolling需要存在。 - Tom
@Tom 是的 - 你正在使用没有超时的 TryTake 方法,它和你之前的代码完全相同 - 这浪费了 CPU (如果队列中没有任何项目,它会立即返回)。如果你不关心超时 - 调用 var result = dataQueue.Take()(就像我的答案中一样)。 - Evk
太好了!!!CPU只有1%,没有线程休眠。非常感谢,我今晚可以安心睡觉了! - Tom

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接