Node.js中使用Kue实现独特的工作机制。

4
我希望如果系统中已经存在相同的作业,那么jobs.create会失败。是否有办法实现这一点?
我需要每24小时运行相同的作业,但有些作业可能需要超过24小时才能完成,因此在添加作业之前,我需要确保该作业尚未存在于系统中(活动、排队或失败)。
更新: 好的,我将简化问题,以便能够在这里解释。假设我有一个分析服务,我必须每天向我的用户发送一份报告。有时候(只有很少的情况,但是这是可能的),完成这些报告需要几个小时,甚至超过一天。
我需要一种方法来知道当前正在运行的作业,以避免重复作业。我在kue API中找不到任何关于哪些作业正在运行的信息。还需要某种事件触发器来获取更多的作业,然后调用我的getMoreJobs生产者。
也许我的方法是错误的,如果是这样,请告诉我更好的解决方法。
这是我的简化代码:
var kue = require('kue'),   
    cluster = require('cluster'),
    numCPUs = require('os').cpus().length;

numCPUs = CONFIG.sync.workers || numCPUs; 

var jobs = kue.createQueue();

if (cluster.isMaster) {
    console.log('Starting master pid:' + process.pid);
    jobs.on('job complete', function(id){
    kue.Job.get(id, function(err, job){
        if (err || !job) return;
        job.remove(function(err){
            if (err) throw err;
            console.log('removed completed job #%d', job.id);
        });
    });

    function getMoreJobs() {
        console.log('looking for more jobs...');
        getOutdateReports(function (err, reports) {
            if (err) return setTimeout(getMoreJobs, 5 * 60 * 60 * 1000);

            reports.forEach(function(report) {
                jobs.create('reports', {
                    id: report.id,
                    title: report.name,
                    params: report.params
                }).attempts(5).save();
            });

            setTimeout(getMoreJobs, 60 * 60 * 1000);
        });
    }

    //Create the jobs
    getMoreJobs();

    console.log('Starting ', numCPUs, ' workers');
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    cluster.on('death', function(worker) {
        console.log('worker pid:' + worker.pid + ' died!'.bold.red);
    });

} else {
    //Process the jobs
    console.log('Starting worker pid:' + process.pid);
    jobs.process('reports', 20, function(job, done){
        //completing my work here
        veryHardWorkGeneratingReports(function(err) {
            if (err) return done(err);
            return done();
        });
    });
}

需要更多信息,代码或其他内容... - Teemu
2个回答

3
你的一个问题的答案是,Kue将其从Redis队列中弹出的作业放入“活动”状态,除非你寻找它们,否则你永远不会得到它们。
另一个问题的答案是,你的分布式工作队列是任务的消费者,而不是生产者。虽然像你现在这样混合使用它们是可以的,但这是一个模糊的范例。我用Kue做的是为Kue的JSON API制作一个包装器,这样就可以从系统中的任何地方将作业放入队列中。由于你似乎需要把作业放进去,我建议编写一个单独的生产者应用程序,它只会获取外部作业并将它们放入你的Kue工作队列中。它可以监视工作队列,以便在作业运行较少时加载一批作业,或者像我所做的那样,尽可能快地将作业放进去,并启动多个消费者应用程序实例以更快地处理负载。
再次强调:你在这里的关注点分离不太好。你应该有一个完全独立于任务消费者应用程序的任务生产者。这给你更大的灵活性、易于扩展(只需在另一台机器上启动另一个消费者即可扩展!)和整体代码管理的便利性。如果可能的话,你还应该允许给你这些任务的人访问你的Kue服务器的JSON API,而不是去寻找它们。作业生产者可以使用Kue安排自己的任务。

2
(是的,我知道我正在回答一个旧问题,但我致力于尽可能让SE的问答内容正确无误) - Nathan C. Tresch

2

请看https//github.com/LearnBoost/kue

在json.js脚本中检查64-112行。您将找到返回包含作业的对象的方法,还可以通过类型、状态或ID范围进行过滤(jobRange()jobStateRange()jobTypeRange())。

向下滚动主页至JSON API部分,您将找到返回对象的示例。

如何调用和使用这些方法,您比我更清楚。

如果传递未知关键字,则jobs.create()将失败。我会创建一个函数来检查forEach-循环中的当前作业,并返回一个关键字。然后,只需在jobs.create()参数中调用此函数而不是文字关键字即可。

通过json.js中的这些方法获取的信息也许有助于您创建“moreJobToDo”事件。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接