在C#中等待阻塞的集合(队列)减小大小

8
我正在处理一个具有以下工作流程的项目:
第一部分:
1. 异步到达事件并排队在阻塞队列中,我们将其称为Q1。 2. 线程从该队列中选择下一个可用项目。 3. 项目最终会并行运行N个任务。 4. 每个任务都在第二个队列上排队,我们将其称为Q2。 5. 当项目处理完成时,将读取队列中的下一个项目。
第二部分:
1. 另一个线程按顺序从Q2中读取一个对象,并处理结果。
因此,这里的问题是,第一个队列中的每个项目最终都会并行运行大量任务,并且每个任务都会将其结果排队。第二个队列必须以串行方式逐个处理,但是它过于拥挤。
我的问题是:
我需要一种机制,使处理Q1的线程等待,直到Q2中的项数低于特定阈值。 最好的方式是什么?是否有事件驱动的解决方案而不是轮询解决方案?
3个回答

9

对于 Q2,您可以使用 BlockingCollection<T> 代替 Queue<T>。如果设置了其BoundedCapacity,则在容量达到限制时,调用Q2.Add()将会被阻止。当 N 个任务无法添加到最终队列时,这将自动限制 Q1 的处理速度。


很酷,我已经在这里使用了阻塞集合,所以这只是一个小改变 :) - John Humphreys
我会试一试,如果能够正常工作就接受。这可能是个不错的解决方案,因为它可以阻止我的任务处理程序{N}个线程在排队结果之前返回,从而一开始就可以阻止更多的事件被处理 :) - John Humphreys

2

我假设你会不定期地收到一大批数据,在这段时间里Q2可以赶上。你是否考虑过通过使用有限的线程池来限制从Q1派生的并发线程数量?

如果可以在任务到达时轻松确定作业大小,那么我认为您可以从多个线程池中受益。您可以使用少量线程处理大型作业,并准备好大量线程来处理小型作业。甚至第三个中间队列也可能会有所裨益。


我们已经在使用线程池了。问题是很难判断要使用多少个线程。有时候我们会收到80k个小事件,系统运行良好(最后一个队列不会积压,因为结果集很小/快速上传到我们的分布式缓存)。但有时候,我们只收到100个事件,但由于结果集非常大,上传需要很长时间,导致系统挂起。因此,直接限制线程池的大小会使得当结果集很小时运行更糟糕,但确实可以解决结果集太大时的洪水问题。+1,这是一个好的解决方案,考虑到我提供的信息。 - John Humphreys

1

你的问题似乎是TPL Dataflow库可以完美解决的一个例子。如果你愿意尝试一下,这就是它的工作方式(当然,这只是一个非常简单的例子):

TransformBlock<int, bool> transform = new TransformBlock<int, bool>(i => i > 5 ? true : false,
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
ActionBlock<bool> resultBlock = new ActionBlock<bool>(b => Console.WriteLine("My result is : " + b),
            new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });
transform.LinkTo(resultBlock);

你正在定义一个转换块,这将使你的转换生效(这就是你的 Q1),你可以将它的并行级别设置为你想要使用的任务数量。
然后,你创建了第二个块(作为你的 Q2),它的 BoundedCapacity 被设置,并且每个消息都会同步处理,为每个元素调用一个操作。这个块可以被任何其他块替换,比如 BufferBlock,它允许你按需从中轮询。

我现在没时间玩这个,但它看起来很有趣;我会另外再试试 :) 谢谢啊。 - John Humphreys

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接