使用多线程和优先级的作业调度程序

3

我将要开始一个 C# .NET 4.0 项目,创建一个工作调度程序。

  1. 该工作没有到期日期,可能长时间运行,最长可达数天。
  2. 该工作有三个优先级:空闲、普通、关键;从低到高排序。
  3. 新的工作将持续不断地创建。
  4. 即使旧工作已经存在很长时间,带有更高优先级的新工作也应优先处理。
  5. 每个工作将由单个长时间运行的线程处理。
  6. 工作是可重入的。工作的状态会保存到数据库中,因此随时暂停或终止工作线程都可以。

我的计划是使用信号量,并将并发条目数设置为系统核心数。对于队列中的每个工作,都将创建一个新线程,并且所有线程在开始时都会被信号量阻止。

我的问题是如何确保高优先级线程在信号量调用 release() 方法时首先进入信号量。这可行吗?

我的第二个问题是如何使处于信号量内部的线程退出,并在更高优先级的工作线程出现时,将退出的工作线程返回到线程队列中等待信号量。这可行吗?

对于这两个问题,信号量是正确的方法吗?如果不是,您有什么建议?


1
我的调度方案纯粹基于优先级,某些作业需要根据新作业的到来而停止。Quartz.net能帮我吗? - nobody
1
不确定如何停止正在运行的作业,但是Quartz.Net确实具有基于优先级的处理。 - Dean Kuga
3个回答

4

我倾向于以下这样的做法...

首先,启动所有需要的线程:

for(int i=0; i < Environment.ProcessorCount; i++)
{
    Thread t = new Thread(RunWork);
    // setup thread
    t.Start();
    threads.Add(t);
}

您需要一个接口来描述任务的优先级。
interface ITask {
    PrioirtyType Prioirty { get; }
    bool Complete { get; }
    void PerformOneUnitOfWork();
}

然后创建一个队列管理对象。这可能会变得更加复杂,因为它可能需要与您的数据库进行同步等操作...

class MyQueue<TJob> where TJob : ITask 
{
    Queue<TJob> high, med, low;
    bool GetNextJob(ref TJob work)
    {
        if(work.Priority == PriorityType.High && !work.Complete)
            return true;
        lock(this)
        {
            if(high.Count > 0)
            {
                Enqueue(work);//requeue to pick back up later
                work = high.Dequeue();
                return true;
            }
            if(work.Priority == PriorityType.Med && !work.Complete)
                return true;
            if(med.Count > 0)
            {
                Enqueue(work);//requeue to pick back up later
                work = med.Dequeue();
                return true;
            }
            if(!work.Complete)
                return true;
            if(low.Count > 0)
            {
                work = low.Dequeue();
                return true;
            }
            work = null;
            return false;
        }

    void Enqueue(TJob work)
    {
        if(work.Complete) return;
        lock(this)
        {
            else if(work.Priority == PriorityType.High) high.Enqueue(work);
            else if(work.Priority == PriorityType.Med) med.Enqueue(work);
            else low.Enqueue(work);
        }
    }
}

最后,创建您的工作线程,像以下这样:
public void RunWork()
{
    ITask job;
    while(!_shutdown.WaitOne(0))
    {
        if(queue.GetNextJob(ref job))
            job.PerformOneUnitOfWork();
        else
            WaitHandle.WaitAny(new WaitHandle[] { _shutdown, queue.WorkReadyHandle });
    }
}

这太棒了,我有一种感觉这可能就是我要找的东西。你能告诉我 _shutdown 是什么数据类型吗? - nobody
我在MyQueue类中没有看到定义WorkReadyHandle的地方。我有什么遗漏吗?总的来说,我对C#还很陌生。 - nobody
_shutdown 可以是 ThreadManualResetEvent,很可能是 Thread。尤其是 .Net 4 已被引用,我仍然更喜欢使用 TPL 而不是手动线程管理。 - IAbstract
TPL在长时间运行的线程中是否高效,这些线程可能会持续几天? - nobody
我预期_shutdown和WorkReadyHandle是手动重置事件,我可能会将关闭和工作准备好的句柄都放在MyQueue类中并访问它们。如果您需要逐个关闭线程,则由您决定。 - csharptest.net
@CSharpTest 天哪,非常感谢。在查看了您的代码后,我现在绝对确定这就是我所寻找的内容。 - nobody

3
对于这两个问题,信号量是否是正确的方法?如果不是,你有什么建议?
这要看情况而定。通常情况下,每个线程最好有多个作业,因为许多(尤其是长时间运行的)工作项会花费大量时间等待 CPU 以外的其他事物。例如,如果您正在进行需要从 WCF 服务中提取数据等相关工作,则可能会花费大量时间被阻塞和空闲。
在这种情况下,最好让您的作业按需安排。在这种类型的场景中,使用线程池可能更好。
如果所有作业都是高 CPU,则您的方法可能是可取的。可以使用优先级队列来跟踪调度优先级并决定要运行哪个作业。
话虽如此,我可能不会使用信号量来解决这个问题。虽然它可以工作,但单个计数器(通过 Interlocked.Increment/Decrement 管理)和 ManualResetEvent 同样有效,并且更轻量级。

使用信号量似乎是实现任务并行库的信号,我认为。TPL和信号量基本上可以工作吗?TPL给你很多控制权。高优先级作业将被排队(例如索引0),将低优先级作业向下推。此时,TPL将允许您暂停最低优先级作业...???还是我误解了什么... - IAbstract
@IAbstract:TPL可能非常有用,尽管它没有内置的“暂停”功能,并且信号量的使用与TPL真的是分开的... - Reed Copsey
没错,有很多实现细节被省略了……而且我也不确定TPL中是否真的有暂停。但是,我相信可以持久化作业状态,从而暂停任务。 :) - IAbstract

-1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接