C#并行foreach如何平均完成任务

9
我将使用C# Parallel.ForEach来处理超过一千个数据子集。每个子集的处理时间取决于其大小,通常需要5-30分钟。在我的电脑上,选择以下选项:
ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount

我将获得8个并行进程。据我理解,进程在并行任务之间平均分配(例如,第一个任务获取1,9,17等作业编号,第二个任务获取2,10,18等作业编号)。因此,一个任务可以比其他任务更快地完成自己的工作,因为这些数据集所需的时间比其他数据集少。
问题是四个并行任务在24小时内完成它们的工作,但最后一个任务需要48小时才能完成。是否有机会组织并行性,使所有并行任务都以相同的速度完成?这意味着所有并行任务都会继续工作,直到所有作业都完成?

最长的任务需要多长时间? - Sergey Kalinichenko
最长的单个循环时间为30分钟,最长并行任务的总处理时间约为48小时。 - Allan
3个回答

4

由于这些工作不相等,你不能将工作数量划分到处理器之间并在大致相同的时间内完成。我认为你需要8个工作线程来按顺序获取下一个任务。你必须在函数上使用锁定以获取下一个任务。

如果我说错了,请有人纠正我...给予一个工作线程像这样的函数:

public void ProcessJob()
{
    for (Job myJob = GetNextJob(); myJob != null; myJob = GetNextJob())
    {
        // process job
    }
}

获取下一个作业的函数如下:

private List<Job> jobs;
private int currentJob = 0;

private Job GetNextJob()
{
    lock (jobs)
    {
        Job job = null;
        if (currentJob < jobs.Count)
        {
            job = jobs[currentJob];
            currentJob++;
        }
        return job;
    }
}

1
+1 我不确定代码是否正确,但是随着工人空闲,从队列中取出任务的想法绝对是正确的。 - Sergey Kalinichenko

1
似乎没有现成的解决方案,需要自己创建。
我之前的代码是:
var ListOfSets = (from x in Database
           group x by x.SetID into z
           select new { ID = z.Key}).ToList();

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount;

Parallel.ForEach(ListOfSets, po, SingleSet=>
{
     AnalyzeSet(SingleSet.ID);
});

为了使所有CPU平均分配工作,我仍然使用Parallel来完成工作,但是不再使用ForEach,而是采用Matt的想法和For。新代码如下:
Parallel.For(0, Environment.ProcessorCount, i=>
{
    while(ListOfSets.Count() > 0)
    {
        double SetID = 0;
        lock (ListOfSets)
        {
            SetID = ListOfSets[0].ID;
            ListOfSets.RemoveAt(0);
        }
     AnalyzeSet(SetID);
    }
});

所以,感谢您的建议。

感谢分享你的解决方案。我不怎么做多线程编程,但下次我做的时候一定会去看看Parallel.For。 - Matt Miller
感谢您提供的解决方案。 注意:在您执行.Count()和锁定另一个线程之间,由于多线程的原因,最后一个列表项可能会被取走。在锁定时,您应该再次检查列表是否还有内容,如果没有,则退出。 - BlouBlou

1

有一种选择,正如其他人所建议的那样,就是管理自己的生产者消费者队列。我想指出使用BlockingCollection可以使这个过程非常容易。

BlockingCollection<JobData> queue = new BlockingCollection<JobData>();

//add data to queue; if it can be done quickly, just do it inline.  
//If it's expensive, start a new task/thread just to add items to the queue.
foreach (JobData job in data)
    queue.Add(job);

queue.CompleteAdding();

for (int i = 0; i < Environment.ProcessorCount; i++)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var job in queue.GetConsumingEnumerable())
        {
            ProcessJob(job);
        }
    }, TaskCreationOptions.LongRunning);
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接