这一切都发生在一个Windows服务中。
我有一个 Queue<T>
(实际上是一个 ConcurrentQueue<T>
)存储着等待处理的项目。但是,我不想一次只处理一个项目,我希望同时处理 n 个项目,其中 n 是可配置的整数。
我该如何使用任务并行库(TPL)完成这个操作?
我知道 TPL 会代表开发人员对集合进行划分以进行并发处理,但不确定是否这正是我所需要的功能。我是多线程和 TPL 的新手。
这一切都发生在一个Windows服务中。
我有一个 Queue<T>
(实际上是一个 ConcurrentQueue<T>
)存储着等待处理的项目。但是,我不想一次只处理一个项目,我希望同时处理 n 个项目,其中 n 是可配置的整数。
我该如何使用任务并行库(TPL)完成这个操作?
我知道 TPL 会代表开发人员对集合进行划分以进行并发处理,但不确定是否这正是我所需要的功能。我是多线程和 TPL 的新手。
TaskFactory
的扩展方法。public static class TaskFactoryExtension
{
public static Task StartNew(this TaskFactory target, Action action, int parallelism)
{
var tasks = new Task[parallelism];
for (int i = 0; i < parallelism; i++)
{
tasks[i] = target.StartNew(action);
}
return target.StartNew(() => Task.WaitAll(tasks));
}
}
ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
var task = Task.Factory.StartNew(
() =>
{
T item;
while (queue.TryDequeue(out item))
{
ProcessItem(item);
}
}, n);
task.Wait(); // Optionally wait for everything to finish.
这里有另一种使用Parallel.ForEach
的方法。这种方法的问题在于,可能并不能保证您所设定的最大并行程度得到实现。您仅指定了允许的最大程度,而不是绝对数量。
ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n },
(item) =>
{
ProcessItem(item);
});
BlockingCollection<T>
而不是ConcurrentQueue<T>
,然后您可以启动任意数量的消费者线程,并使用BlockingCollection
的Take
方法。如果集合为空,则Take
方法将自动在调用线程中阻塞,等待添加项目,否则线程将并行消耗所有队列项。但是,由于您的问题提到了TPL的使用,因此当与BlockingCollection一起使用时,Parallel.ForEach
存在一些问题,请查看此文章以了解更多详细信息。所以您需要自己管理消费者线程的创建。new Thread(/*consumer method*/)
或new Task()
...BlockingCollection
而不是直接使用ConcurrentQueue
。
这里有一个例子:
public class QueuingRequestProcessor
{
private BlockingCollection<MyRequestType> queue;
public void QueuingRequestProcessor(int maxConcurrent)
{
this.queue = new BlockingCollection<MyRequestType>(maxConcurrent);
Task[] consumers = new Task[maxConcurrent];
for (int i = 0; i < maxConcurrent; i++)
{
consumers[i] = Task.Factory.StartNew(() =>
{
// Will wait when queue is empty, until CompleteAdding() is called
foreach (var request in this.queue.GetConsumingEnumerable())
{
Process(request);
}
});
}
}
public void Add(MyRequest request)
{
this.queue.Add(request);
}
public void Stop()
{
this.queue.CompleteAdding();
}
private void Process(MyRequestType request)
{
// Do your processing here
}
}
maxConcurrent
定义了同时处理多少个请求。
foreach (Item item in _collection.GetConsumingEnumerable())
,如果集合为空,它也会在那里阻塞等待添加项。 - Jalal Said