Quartz任务 - 全局禁止并发执行?

3
使用Quartz,我想让几个任务(大约10个)按顺序执行 - 即不是并发执行。它们应该在“会计日更改”事件发生后执行,但由于它们都访问同一个数据库,所以我不希望它们同时开始。相反,我希望它们按顺序执行(顺序无关紧要)。
我有一个想法,将它们放入一个组中 - 比如说“account_day_change_jobs”,然后以某种方式配置Quartz来完成剩下的工作 :-) 这意味着按顺序运行组中的所有作业。我尝试了API文档(1.8和2.1),尝试了Google,但没有找到任何信息。
这是否可能?这是否合理?还有其他实现我想要的行为的想法吗?
非常感谢任何想法 :-) 汉斯
3个回答

2
下面的触发器监听器类应该重新安排任何试图在监听器已被配置为运行另一个作业时执行的作业。 我只进行了轻微的测试,但对于简单情况应该是合适的。
public class SequentialTriggerListener extends TriggerListenerSupport {

private JobKey activeJob;
private Scheduler activeScheduler;
private Queue<JobDetail> queuedJobs = new ConcurrentLinkedQueue<JobDetail>();

public String getName() {
    return "SequentialTriggerListener";
}

public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
    synchronized (this) {
        if (activeJob != null) {
            getLog().debug("Queueing Sequential Job - " + context.getJobDetail().getKey().getName());
            JobDetail jd = context.getJobDetail();
            activeScheduler = context.getScheduler();
            jd = JobBuilder.newJob().usingJobData(jd.getJobDataMap()).withIdentity(getName() + ":" + jd.getKey().getName(), jd.getKey().getGroup())
                    .ofType(jd.getJobClass()).build();
            queuedJobs.add(jd);
            return true;
        } else {
            activeJob = trigger.getJobKey();
            getLog().debug("Executing Job - " + activeJob.getName());
            return false;
        }
    }
}

public void triggerMisfired(Trigger trigger) {
    triggerFinalized(trigger);
}

public void triggerComplete(Trigger trigger, JobExecutionContext context, CompletedExecutionInstruction triggerInstructionCode) {
    triggerFinalized(trigger);
}

protected void triggerFinalized(Trigger trigger) {
    synchronized (this) {
        try {
            if (trigger.getJobKey().equals(activeJob)) {
                getLog().debug("Finalized Sequential Job - " + activeJob.getName());
                activeJob = null;
                JobDetail jd = queuedJobs.poll();
                if (jd != null) {
                    getLog().debug("Triggering Sequential Job - " + jd.getKey().getName());
                    activeScheduler.scheduleJob(jd,TriggerBuilder.newTrigger().forJob(jd).withIdentity("trigger:" + jd.getKey().getName(), jd.getKey().getGroup())
                            .startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withRepeatCount(0).withIntervalInMilliseconds(1)).build());
                }
            } else {
                // this should not occur as the trigger finalizing should be the one we are tracking.
                getLog().warn("Sequential Trigger Listener execution order failer");
            }
        } catch (SchedulerException ex) {
            getLog().warn("Sequential Trigger Listener failure", ex);
        }
    }

}

}


1

我很久以前使用过quartz,但是我会尝试注册两个作业监听器来监听两个不同的组。

基本思路是从一个组/列表("todayGroup")触发一个作业,"todayGroup"作业监听器检测完成情况(好或坏)。然后启动列表中的下一个作业。然而,它将刚完成的作业保存在调度程序下的("tomorrowGroup")中。

public class MyTodayGroupListener extends JobListenerSupport {

    private String name;

    private static String GROUP_NAME = "todayGroup";

    public MyOtherJobListener(String name) {
    this.name = name;
    }

    public String getName() {
    return name;
    }


    @Override
    public void jobWasExecuted(JobExecutionContext context,
        JobExecutionException jobException) {
        Scheduler sched = context.getScheduler();
        // switch the job to the other group so we don't run it again today.
        JobDetail current = context.getJobDetail();
        JobDetail tomorrows  = current.getJobBuilder().withIdentity(current.getKey().getName(), "tomorrow").build();
        sched.addJob(tomorrows,true);

        //see if there is anything left to run
        Set<JobKey> jobKeys = sched.getJobKeys(groupEquals(GROUP_NAME ));
        Iterator<JobKey> nextJob = null;
        if(jobKeys != null && !jobKeys.isEmpty() ){
            nextJob = jobKeys.iterator();
        }
        if(nextJob != null){
            // Define a Trigger that will fire "now" and associate it with the first job from the list
            Trigger trigger = newTrigger()
                .withIdentity("trigger1", "group1")
                .startNow()
                .forJob(nextJob =.next())
                .build();

            // Schedule the trigger
            sched.scheduleJob(trigger);
        }

    }
}

同样,您需要两个“组触发器”,它们将在您想要的给定时间从各自的组中触发第一个作业。

0
public class TriggerGroupDisallowConcurrentExecutionTriggerListener : ITriggerListener
{
    private IScheduler activeScheduler;
    private readonly object locker = new object();
    private ConcurrentDictionary<string, JobsQueueInfo> groupsDictionary = new ConcurrentDictionary<string, JobsQueueInfo>();

    public string Name => "TriggerGroupDisallowConcurrentExecutionTriggerListener";

    public Task TriggerComplete(ITrigger trigger, IJobExecutionContext context, SchedulerInstruction triggerInstructionCode, CancellationToken cancellationToken = default)
    {
        //JobKey key = context.JobDetail.Key;
        //Console.WriteLine($"{DateTime.Now}: TriggerComplete. {key.Name} - {key.Group} - {trigger.Key.Name}");

        TriggerFinished(trigger, cancellationToken);
        return Task.CompletedTask;
    }

    public Task TriggerFired(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default)
    {
        //JobKey key = context.JobDetail.Key;
        //Console.WriteLine($"{DateTime.Now}: TriggerFired. {key.Name} - {key.Group} - {trigger.Key.Name}");

        return Task.CompletedTask;
    }

    public Task TriggerMisfired(ITrigger trigger, CancellationToken cancellationToken = default)
    {
        //JobKey key = trigger.JobKey;
        //Console.WriteLine($"{DateTime.Now}: TriggerMisfired. {key.Name} - {key.Group} - {trigger.Key.Name}");

        TriggerFinished(trigger, cancellationToken);
        return Task.CompletedTask;
    }

    public Task<bool> VetoJobExecution(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default)
    {
        //JobKey key = context.JobDetail.Key;
        //Console.WriteLine($"{DateTime.Now}: VetoJobExecution. {key.Name} - {key.Group} - {trigger.Key.Name}");

        lock (locker)
        {
            //if (!groupsDictionary.ContainsKey(context.JobDetail.Key.Group))
            //{
            groupsDictionary.TryAdd(context.JobDetail.Key.Group, new JobsQueueInfo { QueuedJobs = new ConcurrentQueue<IJobDetail>(), ActiveJobKey = null });
            var activeJobKey = groupsDictionary[context.JobDetail.Key.Group].ActiveJobKey;
            //}

            if (activeJobKey != null && activeJobKey != context.JobDetail.Key)
            {
                var queuedJobs = groupsDictionary[context.JobDetail.Key.Group].QueuedJobs;
                if (queuedJobs.Any(jobDetail => jobDetail.Key.Name == context.JobDetail.Key.Name) == true)
                {
                    //NOTE: Джоба уже есть в очереди, нет необходимости её добавлять повторно
                    return Task.FromResult(true);
                }
                else
                {
                    //NOTE: Добавить джобу в очередь на выполнение, и не выполнять её сейчас, т.к. она будет выполнена как только подойдёт её очередь
                    activeScheduler = context.Scheduler;
                    var newJob = JobBuilder.Create(context.JobDetail.JobType).WithIdentity(context.JobDetail.Key).Build();
                    queuedJobs.Enqueue(newJob);

                    return Task.FromResult(true);
                }
            }

            groupsDictionary[context.JobDetail.Key.Group].ActiveJobKey = trigger.JobKey;

            return Task.FromResult(false);
        }
    }

    protected void TriggerFinished(ITrigger trigger, CancellationToken cancellationToken = default)
    {
        lock (locker)
        {
            try
            {
                if (!groupsDictionary.ContainsKey(trigger.JobKey.Group))
                {
                    return;
                }

                var queuedJobs = groupsDictionary[trigger.JobKey.Group].QueuedJobs;
                if (queuedJobs.TryDequeue(out IJobDetail jobDetail))
                {
                    //Console.WriteLine($"dequeue - {jobDetail.Key.Name}");

                    var task = activeScheduler.TriggerJob(jobDetail.Key, cancellationToken);
                    task.ConfigureAwait(false);
                    task.Wait(cancellationToken);

                    groupsDictionary[trigger.JobKey.Group].ActiveJobKey = jobDetail.Key;
                }
                else
                {
                    groupsDictionary[trigger.JobKey.Group].ActiveJobKey = null;
                }
            }
            catch (SchedulerException ex)
            {
                throw;
            }
        }
    }

    private class JobsQueueInfo
    {

        public ConcurrentQueue<IJobDetail> QueuedJobs { get; set; }

        public JobKey ActiveJobKey { get; set; }
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接