多线程中的队列和出队操作

3

我正在创建一个索引器,它会将需要处理的项目加入队列。索引器将把项目添加到其处理器中。例如,它会添加100个项目,然后3分钟内不再添加项目,接着再添加50个项目。

public class Processer
{
    private ConcurrentQueue<Item> items;

    public void AddItem(Item item)
    {
        this.items.Enqueue(item);
    }
}

这些项目将以随机间隔出现,因此我将创建一个单独的线程来出队并处理这些项目。

使用哪个选项最好?

  1. Don't use a Collection, but use the ThreadPool:

    public void AddItem(Item item)
    {
        ThreadPool.QueueUserWorkItem(function, item);
    }
    

    This will automatically create a Queue, and process the items, but I have less control, when 20 items are found, they will almost stop my indexer to run and first finish this thread pool

  2. Use a long running Task:

    public Processer()
    {
        this.task = Task.Factory.StartNew(() => DequeueItems(),
            CancellationToken.None,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
    }
    
    public DequeueItems()
    {
        while(true)
        {
            Item item = null;
            while(this.items.TryDequeue(out item)
            {
                this.store.ExecuteIndex((AbstractIndexCreationTask)item);
            }
    
            Thread.Sleep(100); 
        }
    }
    

    But I hate the while() and thread.sleep I've got to use, since the enumerable will dry up after some time, and it will need recheck if there are new items.

  3. Use a short running task:

    public Processer()
    {
    
    }
    private void Run()
    {
        this.task = Task.Factory.StartNew(() => DequeueItems(),
            CancellationToken.None,
            TaskCreationOptions.PreferFairness,
            TaskScheduler.Default);
    }
    public void AddItem(Item item)
    {
        this.items.Add(item);
        if(this.task == null || this.task.isCompleted)
            this.Run();
    }
    public DequeueItems()
    {
        Item item = null;
        while(this.items.TryDequeue(out item)
        {
            this.store.ExecuteIndex((AbstractIndexCreationTask)item);
        }
    }
    

    This might be nicer? But starting a thread is a "expensive" operation, and I don't know if I can miss items since I check IsCompleted, which can be in the process of ending the while loop and this way missing 1 item. But it doesn't sleep, and use a dirty while loop.

  4. Your option, since MSDN recommends to use the TPL, I thought not to use Threads, but maybe there are better ways dealing with this problem

更新日志

  1. 更改为BlockingCollection
  2. 又改回了ConcurrentQueue

我检查过的一些事情:


2
  1. 在生产者-消费者场景中,使用 BlockingCollection<T>ConcurrentQueue<T>
- Patryk Ćwiek
1
@user2331234:不,队列确实是问题所在。它不是线程安全的,而BlockingCollection<T>则是专门为像你这样的生产者/消费者情况而设计的。 - Jon Skeet
GetConsumingEnumerable最终会结束循环,不是吗?所以当我的索引器添加100个项目时,处理器最终完成索引。完成后,线程被终止,新项目将不再被索引。我的索引器可以找到100个项目,然后等待3分钟(搜索新项目),然后再添加另外100个项目。因此,我使用了while sleep。顺便说一下,当您发布反应时,我已经将其编辑为使用阻塞集合。 - user2331234
1
我在发表评论后才看到了你的编辑。我的理解是GetConsumingEnumerable会一直等待,直到调用CompleteAdding,所以我仍然不理解Thread.Sleepwhile循环的作用。 - Daniel Kelley
Daniel,你说得对,我已经在上面的示例中进行了更改。这是停止有关我的队列构造的评论的唯一方法。当您添加项目并循环遍历它们(一次)时,BlockingCollection会很有帮助。在我的情况下,项目将在应用程序的生命周期内添加,并且必须尽可能地处理项目。 - user2331234
显示剩余2条评论
2个回答

3
我认为这里最简单的解决方案是使用 BlockingCollection(可能使用其GetConsumingEnumerable()),以及一个长时间运行的Task。当没有任务需要处理时,这将浪费一个 Thread,但一个浪费掉的 Thread 并不那么糟糕。
如果您无法承受浪费该 Thread,那么可以尝试使用您提到的第三种方法。但是您必须非常小心地使其线程安全。例如,在您的代码中,如果 Task 没有运行并且同时从两个线程调用 AddItem(),则会创建两个 Task,这几乎肯定是错误的。
如果您正在使用 .Net 4.5,则另一种选择是使用 TPL Dataflow 中的 ActionBlock。使用它,您不会浪费任何线程,并且不必编写艰难的线程安全代码。

谢谢,我会添加锁来确保它只运行一次。很遗憾我不能使用.NET 4.5,因为我正在为Windows Server 2003+创建软件... - user2331234

2
我认为Semaphore可能是适合你的东西。你可以在这里找到一个相当好的解释:这里
此外,我建议使用ConcurrentQueue

就像这个问题一样(在我的“已检查列表”中),我想知道是否使用TPL更好,但还是感谢提供的示例。确实可能是限制最大线程数的不错方式,但在我的情况下,它会打开很多挂起的线程,我不知道这是否会导致异常。队列中可能有1000个或更多项,每个项都将直接获得自己的线程,等待sem被释放。 - user2331234
你能解释一下在这里具体如何使用“Semaphore”吗? - svick

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接