使用任务并行库同时仅处理n个项目

8

这一切都发生在一个Windows服务中。

我有一个 Queue<T> (实际上是一个 ConcurrentQueue<T>)存储着等待处理的项目。但是,我不想一次只处理一个项目,我希望同时处理 n 个项目,其中 n 是可配置的整数。

我该如何使用任务并行库(TPL)完成这个操作?

我知道 TPL 会代表开发人员对集合进行划分以进行并发处理,但不确定是否这正是我所需要的功能。我是多线程和 TPL 的新手。

3个回答

4
这里有一个想法,涉及创建一个 TaskFactory 的扩展方法。
public static class TaskFactoryExtension
{
    public static Task StartNew(this TaskFactory target, Action action, int parallelism)
    {
        var tasks = new Task[parallelism];
        for (int i = 0; i < parallelism; i++)
        {
            tasks[i] = target.StartNew(action);
        }
        return target.StartNew(() => Task.WaitAll(tasks));
    }
}

然后您的调用代码将如下所示。
ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
var task = Task.Factory.StartNew(
  () =>
  {
    T item;
    while (queue.TryDequeue(out item))
    {
      ProcessItem(item);
    }
  }, n);
task.Wait(); // Optionally wait for everything to finish.

这里有另一种使用Parallel.ForEach的方法。这种方法的问题在于,可能并不能保证您所设定的最大并行程度得到实现。您仅指定了允许的最大程度,而不是绝对数量。

ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n },
  (item) =>
  {
    ProcessItem(item);    
  });

4
请使用BlockingCollection<T>而不是ConcurrentQueue<T>,然后您可以启动任意数量的消费者线程,并使用BlockingCollectionTake方法。如果集合为空,则Take方法将自动在调用线程中阻塞,等待添加项目,否则线程将并行消耗所有队列项。但是,由于您的问题提到了TPL的使用,因此当与BlockingCollection一起使用时,Parallel.ForEach存在一些问题,请查看文章以了解更多详细信息。所以您需要自己管理消费者线程的创建。new Thread(/*consumer method*/)new Task()...

BlockingCollection违背了队列的目的。当正在迭代时,我无法从阻塞集合中删除项目。 - Ronnie Overby
2
不,你可以使用它的GetConsumingEnumerable。例如:foreach (Item item in _collection.GetConsumingEnumerable()),如果集合为空,它也会在那里阻塞等待添加项。 - Jalal Said

1
我还建议使用BlockingCollection而不是直接使用ConcurrentQueue

这里有一个例子:

public class QueuingRequestProcessor
{
  private BlockingCollection<MyRequestType> queue;

  public void QueuingRequestProcessor(int maxConcurrent)
  {
    this.queue = new BlockingCollection<MyRequestType>(maxConcurrent);

    Task[] consumers = new Task[maxConcurrent];

    for (int i = 0; i < maxConcurrent; i++)
    {
      consumers[i] = Task.Factory.StartNew(() =>
      {
        // Will wait when queue is empty, until CompleteAdding() is called
        foreach (var request in this.queue.GetConsumingEnumerable())
        {
          Process(request);
        }
      });
    }
  }

  public void Add(MyRequest request)
  {
    this.queue.Add(request);
  }

  public void Stop()
  {
    this.queue.CompleteAdding();
  }

  private void Process(MyRequestType request)
  {
    // Do your processing here
  }
}

请注意,构造函数中的maxConcurrent定义了同时处理多少个请求。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接