尝试在ConcurrentQueue中使用Dequeue方法。

32
在`ConcurrentQueue`中,如果队列中没有任何项,`TryDequeue`将返回false。
如果队列为空,我希望我的队列能够等待直到有新的项被添加到队列中,并且它会出队这个新项,然后继续这个过程。
在C# 4.0中,我应该使用`Monitor.Enter`、`Wait`、`Pulse`还是其他更好的选项?

3
嗯,让我感到惊讶的是这个并没有方便地公开展示出来,这似乎是一个常见的使用情况... - Marc Gravell
我认为在4.0版本中,BCL类应该会暴露这个功能,但如果没有其他的出现,也许可以看看这个答案(https://dev59.com/53RB5IYBdhLWcg3wvpic#530228),其中TryDequeue将会阻塞直到*要么*有一个项目(返回“true”)*或者*队列为空并且明确关闭(返回“false”)。 - Marc Gravell
如果队列为空,我需要我的线程等待,直到有新的项目添加到队列中并将其出队,然后该过程将继续进行。 - C-va
@Marc:我需要恢复我的线程,以便在添加新项时立即出队。我该如何实现这一点? - C-va
1
以上的代码可以实现该功能,但看起来 "Damien_The_Unbeliever" 在这里有答案。 - Marc Gravell
旧问题,但是来到这里的人可能也想查看这个答案,它建议使用BufferBlock<T>,可以实现异步等待。 - Jono Job
3个回答

55

这不是BlockingCollection的设计目的吗?

据我理解,您可以使用其中一个来封装ConcurrentQueue,然后调用Take


3
你可以使用 BlockingCollection
做类似这样的事情:
private BlockingCollection<string> rowsQueue;
private void ProcessFiles() {
   this.rowsQueue = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000);
   ReadFiles(new List<string>() { "file1.txt", "file2.txt" });


   while (!this.rowsQueue.IsCompleted || this.rowsQueue.Count > 0)
   {
       string line = this.rowsQueue.Take();

       // Do something
   }
}

private Task ReadFiles(List<string> fileNames)
{
    Task task = new Task(() =>
    {
        Parallel.ForEach(
        fileNames,
        new ParallelOptions
        {
            MaxDegreeOfParallelism = 10
        },
            (fileName) =>
            {
                using (StreamReader sr = File.OpenText(fileName))
                {
                    string line = String.Empty;
                    while ((line = sr.ReadLine()) != null)
                    {
                           this.rowsQueue.Add(line);
                    }
                }
            });

        this.rowsQueue.CompleteAdding();
    });

    task.Start();

    return task;
}

-2

您可以定期检查队列中的元素数量,当元素数量大于零时,使用例如ManualResetEvent发出信号给出队线程,直到队列为空为止。

以下是此操作的伪代码:

检查线程:

while(true)
{
  int QueueLength = 0;
  lock(Queue)
  {
    queueLength = Queue.Length;
  }

  if (Queue.Length > 0)
  {
    manualResetEvent.Set();
  }
  else
  {
    Thread.Sleep(...);
  }       
}    

出队线程:

while(true)
{
  if(manualResetEvent.WaitOne(timeout))
  {
    DequeueUntilQueueEmpty();
  }
}

在DequeueUntilQueueEmpty中也考虑使用锁。

1
我目前正在使用Monitor.enter、wait和pulseall来使线程等待和发出信号。ManualResetEvent是更好的选择吗? - C-va
我目前正在使用Monitor.enter、wait和pulseall来使线程等待和信号传递。ManualResetEvent是更好的选择吗? - C-va
ManualResetEvent是轻量级选项,Monitor是重量级选项。 - Tomas Walek

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接