我正在使用 amqplib
,尝试实现重新连接机制。然而,在连接重新建立后,我的通道似乎仍然关闭。我该如何解决这个问题?这是我的代码。
var pubQueue = [];
module.exports = {
connect: function (callback) {
var self = this;
amqp.connect(config.queue.url, function(err, connection) {
if (err) {
console.error("[AMQP]", err.message);
return setTimeout(module.exports.connect, 2000);
}
connection.on("error", function(err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
connection.on("close", function() {
console.error("[AMQP] reconnecting");
return setTimeout(module.exports.connect, 2000);
});
connection.createChannel(function(err, ch) {
console.log('connection is reestablished');
self.channel = ch;
return callback ? callback() : false;
});
});
},
publish: function (message, callback) {
var self = this;
var key = this.generateKey(message);
var m = new Buffer(JSON.stringify(message));
try {
self.channel.assertExchange(config.queue.exchange, 'topic', {durable: false, nowait: true});
self.channel.publish(config.queue.exchange, key, m, {nowait: true});
return callback ? callback() : true;
} catch(err) {
pubQueue.push({key: key, m: m});
console.log(err);
}
}
}
connect()
函数将在express应用程序启动后调用。但是publish
函数将在每个请求时都被调用。这就是为什么我有一个pubQueue
来存储丢失的消息。我还没有实现重新发送队列中消息的功能,但我先遇到了这个错误,我似乎无法理解它。
connection is reestablished
{ [IllegalOperationError: Channel closed]
message: 'Channel closed',
stack: 'IllegalOperationError: Channel closed\n at Channel.}