在Heroku上重启NodeJS工作进程后恢复排队的任务

9

我在 Heroku 上搭建了一个相当简单的系统。我使用 RabbitMQ 处理后台工作。我的设置包括使用 Heroku Scheduler 插件每天运行一个节点脚本。该脚本将作业添加到队列中,工作者进而消耗它们并委派给一个独立模块进行处理。

问题是,在我随机收到 SIGTERM 事件之后,Heroku 突然重新启动实例之前,就开始出现问题了。

由于某种原因,在实例重新启动之后,工作者再也无法恢复正常工作。只有当我手动执行 heroku ps:scale worker=0heroku ps:scale worker=1 命令来重新启动它时,工作者才会继续消耗待处理的任务。

这是我的工作者代码:

// worker.js
var throng = require('throng');
var jackrabbit = require('jackrabbit');
var logger = require('logfmt');
var syncService = require('./syncService');

var start = function () {
    var queue = jackrabbit(process.env.RABBITMQ_BIGWIG_RX_URL || 'amqp://localhost');

    logger.log({type: 'msg', msg: 'start', service: 'worker'});

     queue
        .default()
        .on('drain', onDrain)
        .queue({name: 'syncUsers'})
        .consume(onMessage)

    function onMessage(data, ack, nack) {

        var promise;
        switch (data.type) {
            case 'updateUser':
                promise = syncService.updateUser(data.target, data.source);
                break;
            case 'createUser':
                promise = syncService.createUser(data.source);
                break;
            case 'deleteUser':
                promise = syncService.deleteUser(data.target);
        }

        promise.then(ack, nack);
    }

    function onDrain() {
        queue.close();
        logger.log({type: 'info', msg: 'sync complete', service:    'worker'});
     }

    process.on('SIGTERM', shutdown);


    function shutdown() {
        logger.log({type: 'info', msg: 'shutting down'});
        queue.close();
        process.exit();
    }

};


throng({
    workers: 1,
    lifetime: Infinity,
    grace: 4000
}, start);

在Heroku发生SIGTERM后,您在日志文件中看到了什么?您是否收到“正在关闭”消息?在RMQ方面呢 - 在SIGTERM关闭Heroku进程后,RMQ是否显示处于“未确认”状态的消息?您是否仍然看到打开的连接和通道? - Derick Bailey
没有“关闭”消息,只有“使用SIGTERM停止所有进程”,然后“进程以状态0退出”。 - yohairosen
1个回答

1
< p > < code > close() 方法适用于 < code > jackrabbit 对象需要回调函数,在回调函数完成前应避免退出进程:

function shutdown() {
    logger.log({type: 'info', msg: 'shutting down'});
    queue.close(function (e) {
      process.exit(e ? 1 : 0);
    });
}

谢谢。你能解释一下这与为什么RabbitMQ无法恢复处理剩余作业有关系吗? - yohairosen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接