傻瓜式线程队列

6
我有一个很常见的线程处理需求:
- 我有100个相同的任务需要完成 - 所有任务彼此独立 - 我想同时处理最多15个任务 - 每当一个任务完成时,就会启动一个新的任务,直到所有任务都完成
假设每个任务在完成时都会触发一个事件(我正在使用BackgroundWorker类),我可以想到几种方法来实现这个需求,但我不确定哪种方法是“正确”的。我希望你们这些大牛能指点一下方向。
解决方案1:在我的Main()函数中使用while(continue){Threading.Sleep(1000);}循环。Job_Completed事件处理程序中的代码将在没有剩余任务要排队且所有排队的任务已完成时设置continue = false。我以前用过这个解决方案,虽然它似乎工作得很好...但对我来说有点“奇怪”。
解决方案2:在我的Main()函数中使用Application.Run()。同样,在Job_Completed事件处理程序中的代码将在没有剩余任务要排队且所有排队的任务已完成时调用Application.Exit()。
解决方案3:使用ThreadPool,将所有500-1000个请求排队,让它们同时运行10个(SetMaxThreads),并以某种方式等待它们全部完成。
在所有这些解决方案中,基本思路是每当另一个任务完成时启动一个新任务,直到没有任务为止。因此,问题不仅是等待现有任务完成,还要等待没有任何挂起的任务就绪。如果ThreadPool是正确的解决方案,那么等待ThreadPool完成所有排队的项目的正确方法是什么?
我认为我最大的困惑在于我不明白事件是如何能够从我的Main()函数中触发的。显然,它们可以,我只是不理解从Windows消息循环的角度来看它的机制。解决这个问题的正确方法是什么?为什么?

似乎大多数建议都集中在ThreadPool风格的解决方案上...那么我提出的SOLUTION 1和SOLUTION 2呢(基本上是等待一个条件被事件修改)?这种方式有什么本质上的问题,还是只是因为.NET提供了ThreadPool所以不必要?像这样的代码看起来很奇怪:while(continue) Threading.Sleep(1000);...在Main()函数中等待事件触发。在这样的代码中...我的事件到底何时处理...是在Sleep()调用中吗? - Casey Gay
8个回答

3
尽管其他答案也不错,如果您需要另一个选项(您永远不能拥有足够的选项),那么这个想法怎么样呢?
只需将每个工作的数据放入一个FIFO堆栈中的结构中。
创建15个线程。
每个线程将从堆栈中获取下一个任务并弹出它。
当线程完成处理时,获取下一个任务,如果堆栈为空,则线程会死亡或只是休眠等待。
唯一的复杂性是使弹出处于关键部分(同步读取/弹出),但这很容易解决。

2

关于“等待它们全部完成”的问题

ManualResetEvent 是你的好朋友,在开始大批量操作之前,先创建一个这样的实例,在你的主线程上等待它,当工作完成时在后台操作的结尾设置它。

另一个选项是手动创建线程并进行 foreach 循环,使用 thread.Join() 函数。

你可以使用这个方法(我在测试过程中使用)

     private void Repeat(int times, int asyncThreads, Action action, Action done) {
        if (asyncThreads > 0) {

            var threads = new List<Thread>();

            for (int i = 0; i < asyncThreads; i++) {

                int iterations = times / asyncThreads; 
                if (i == 0) {
                    iterations += times % asyncThreads;                    
                }

                Thread thread = new Thread(new ThreadStart(() => Repeat(iterations, 0, action, null)));
                thread.Start();
                threads.Add(thread);
            }

            foreach (var thread in threads) {
                thread.Join();
            }

        } else {
            for (int i = 0; i < times; i++) {
                action();
            }
        }
        if (done != null) {
            done();
        }
    }

使用方法:

// Do something 100 times in 15 background threads, wait for them all to finish.
Repeat(100, 15, DoSomething, null)

1

我会使用 Task Parallel Library。

你可以将你的任务作为单个简单的 Parallel.For 循环完成,它会自动进行相当干净的管理。如果你等不及 C# 4 和微软的实现,则临时解决方法是编译并使用 Mono Implementation of TPL。(我个人更喜欢 MS 实现,特别是较新的 beta 版本,但 Mono 的实现今天就已经是功能齐备并可重新分发的了。)


1

当您将工作项排队到线程队列中时,应该会返回一个等待句柄。将它们全部放入数组中,然后可以将其作为参数传递给WaitAll()函数。


好主意,但你会怎么做呢?QueueUserWorkItem返回一个布尔值。 - Grokys

1

我会使用线程池。

在开始运行作业之前,创建一个ManualResetEvent和一个int计数器。将每个作业添加到线程池中,每次增加计数器。

在每个作业结束时,递减计数器,并在计数器为零时,在事件上调用Set()

在主线程中,调用WaitOne()等待所有作业完成。


0
这是我处理它的伪代码(没有利用线程池,所以可能有更好的解决方案):
main
{
    create queue of 100 jobs
    create new array of 15 threads
    start threads, passing each the job queue
    do whatever until threads are done
}

thread(queue)
{
    while(queue isn't empty)
    {
        lock(queue) { if queue still isn't empty dequeue a thing }
        process the thing
    }

    queue is empty so exit thread
}

编辑:如果您的问题是如何判断线程何时完成,并且您正在使用普通的C#线程(而不是ThreadPooled线程),则可以在每个线程上调用Thread.Join(),并带有可选的超时,它将仅在线程完成后返回。如果您想跟踪完成的线程数量而不会卡在其中一个上面,可以按以下方式循环遍历它们:

for(int i = 0; allThreads.Count > 0; i++)
{
    var thisThread = allThreads[i % threads.Count];
    if(thisThread.Join(timeout)) // something low, maybe 100 ms or something
        allThreads.Remove(thisThread);
}

0

线程池可能是一个不错的选择。使用SetMaxThreads方法可以限制正在执行的线程数量。但是,这会限制进程/AppDomain的最大线程数。如果进程作为服务运行,我不建议使用SetMaxThreads

private static ManualResetEvent manual = new ManualResetEvent(false);
private static int count = 0;

public void RunJobs( List<JobState> states )
{
     ThreadPool.SetMaxThreads( 15, 15 );

     foreach( var state in states )
     {
          Interlocked.Increment( count );
          ThreadPool.QueueUserWorkItem( Job, state );
     }

    manual.WaitOne();
}

private static void Job( object state )
{
    // run job
    Interlocked.Decrement( count );
    if( Interlocked.Read( count ) == 0 ) manual.Set();
}

0

微软的响应式框架非常适合这个:

Action[] jobs = new Action[100];

var subscription =
    jobs
        .ToObservable()
        .Select(job => Observable.Start(job))
        .Merge(15)
        .Subscribe(
            x => Console.WriteLine("Job Done."),
            () => Console.WriteLine("All Jobs Done."))

完成。

只需使用 NuGet 安装 "System.Reactive"。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接