等待一个BlockingCollection队列被后台线程清空,如果等待时间过长,则设置超时时间?

11
在C#中,我想知道是否有可能等待直到一个BlockingCollection被后台线程清除,如果等待时间过长则设置超时。目前我所使用的临时代码让我感觉有些不够优雅(因为使用Thread.Sleep好像并不是一个好习惯):
while (_blockingCollection.Count > 0 || !_blockingCollection.IsAddingCompleted)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(20));
    // [extra code to break if it takes too long]
}

2
我假设你是在调用BlockingCollection.CompleteAdding()之后才这样做的 - 对吗?无论如何,你现在所做的没有更好的方法。 - Matthew Watson
@Matthew Watson。好观点;我添加了一个额外的术语,以确保没有排队的项目尚未添加到队列中。我还澄清了问题,提到后台线程正在清除队列。 - Contango
1
“等待”具体是什么意思?如果您正在使用TryTake,确实可以指定超时时间。如果您正在使用枚举器,则受限制较多,但原则上您可以创建一个使用TryTake的枚举器包装器。 - Holstebroe
1
@Contango 我认为你的循环条件应该是_blockingCollection.Count != 0 || !blockingCollection.IsAddingCompleted(注意使用||而不是&&)。 - Matthew Watson
@Holstebroe 我希望我能使用这种方法来修复它,但目前代码正在等待后台线程清空队列作为单元测试过程的一部分,因此它实际上无法使用 TryTake。 - Contango
3个回答

7
您可以在消费线程中使用GetConsumingEnumerable()foreach来确定队列何时为空,然后设置一个ManualResetEvent,主线程可以检查该事件以判断队列是否为空。GetConsumingEnumerable()返回一个枚举器,它会在空队列上终止之前检查是否已调用CompleteAdding()
示例代码:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    internal class Program
    {
        private void run()
        {
            Task.Run(new Action(producer));
            Task.Run(new Action(consumer));

            while (!_empty.WaitOne(1000))
                Console.WriteLine("Waiting for queue to empty");

            Console.WriteLine("Queue emptied.");
        }

        private void producer()
        {
            for (int i = 0; i < 20; ++i)
            {
                _queue.Add(i);
                Console.WriteLine("Produced " + i);
                Thread.Sleep(100);
            }

            _queue.CompleteAdding();
        }

        private void consumer()
        {
            foreach (int n in _queue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consumed " + n);
                Thread.Sleep(200);
            }

            _empty.Set();
        }

        private static void Main()
        {
            new Program().run();
        }

        private BlockingCollection<int> _queue = new BlockingCollection<int>();

        private ManualResetEvent _empty = new ManualResetEvent(false);
    }
}

6

如果在你的消费线程中写了以下类似的内容会怎么样:

var timeout = TimeSpan.FromMilliseconds(10000);
T item;
while (_blockingCollection.TryTake(out item, timeout))
{
     // do something with item
}
// If we end here. Either we have a timeout or we are out of items.
if (!_blockingCollection.IsAddingCompleted)
   throw MyTimeoutException();

3

如果您可以重新设计使得在集合为空时可以设置事件,那么您可以使用等待句柄来等待集合变为空:

static void Main(string[] args)
{
    BlockingCollection<int> bc = new BlockingCollection<int>();
    ManualResetEvent emptyEvent = new ManualResetEvent(false);
    Thread newThread = new Thread(() => BackgroundProcessing(bc, emptyEvent));
    newThread.Start();
    //wait for the collection to be empty or the timeout to be reached.
    emptyEvent.WaitOne(1000);

}
static void BackgroundProcessing(BlockingCollection<int> collection, ManualResetEvent emptyEvent)
{
    while (collection.Count > 0 || !collection.IsAddingCompleted)
    {
        int value = collection.Take();
        Thread.Sleep(100);
    }
    emptyEvent.Set();
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接