如何在 Node.js 中重新连接后正确检查 RabbitMQ 通道是否已打开?

6

我正在使用 amqplib,尝试实现重新连接机制。然而,在连接重新建立后,我的通道似乎仍然关闭。我该如何解决这个问题?这是我的代码。

var pubQueue = [];
module.exports = {
  connect: function (callback) {
    var self = this;
    amqp.connect(config.queue.url, function(err, connection) {
      if (err) {
        console.error("[AMQP]", err.message);
        return setTimeout(module.exports.connect, 2000);
      }

      connection.on("error", function(err) {
        if (err.message !== "Connection closing") {
          console.error("[AMQP] conn error", err.message);
        }
      });

      connection.on("close", function() {
        console.error("[AMQP] reconnecting");
        return setTimeout(module.exports.connect, 2000);
      });

      connection.createChannel(function(err, ch) {
        console.log('connection is reestablished');
        self.channel = ch;
        return callback ? callback() : false;
      });
    });
  },

  publish: function (message, callback) {
    var self = this;
    var key = this.generateKey(message);
    var m = new Buffer(JSON.stringify(message));

    try {
      self.channel.assertExchange(config.queue.exchange, 'topic', {durable: false, nowait: true});
      self.channel.publish(config.queue.exchange, key, m, {nowait: true});
      return callback ? callback() : true;
    } catch(err) {
      pubQueue.push({key: key, m: m});
      console.log(err);
    }
  }
} 

connect()函数将在express应用程序启动后调用。但是publish函数将在每个请求时都被调用。这就是为什么我有一个pubQueue来存储丢失的消息。我还没有实现重新发送队列中消息的功能,但我先遇到了这个错误,我似乎无法理解它。

connection is reestablished
{ [IllegalOperationError: Channel closed]
  message: 'Channel closed',
  stack: 'IllegalOperationError: Channel closed\n    at Channel.}

顺便提一下,有一个名为wascally的库,建立在amqplib之上,它内置了这个功能:https://github.com/LeanKit-Labs/wascally - Derick Bailey
1个回答

0

当连接关闭时,将self.channel设置为null,并在publish()中,如果self.channel为null,则不执行任何操作(可能有更好的方法,但这演示了其能力)。

顺便说一下,由于connect需要一个回调参数,因此在Promise中包装setTimeout,并使用then()调用您的连接函数与回调参数,如下所示:

new Promise((resolve) => {
  setTimeout(resolve, 2000);
}).then(() => {
  module.exports.connect(callback);
});

因此,修改后的代码在启动时,如果没有RabbitMQ运行,则会重复尝试连接,直到RabbitMQ已启动。它开始重复发布消息,如果RabbitMQ停止,则优雅地尝试重新连接并停止发布。

工作代码包括一个用于测试的服务器s.js,它连接并重复调用发布:

const amqp = require("./so.js");

const doPublish = () => {
  amqp.publish("test", () => {
    console.log("publish complete");
  });
  // publish again
  setTimeout(doPublish, 2000);
};

amqp.connect(function () {
  doPublish();
})

您的代码的修订版本,so.js,如下:

var config = {
  queue: {
    url: "amqp://localhost/",
    exchange: 'so_exchange',
  },
};

var amqp = require("amqplib/callback_api");

var pubQueue = [];

module.exports = {

  reconnect: function (callback) {
    new Promise((resolve) => {
      setTimeout(resolve, 2000);
    }).then(() => {
      module.exports.connect(callback);
    });
  },

  connect: function (callback) {
    var self = this;
    amqp.connect(config.queue.url, function (err, connection) {
      if (err) {
        console.error("[AMQP]", err.message);
        return module.exports.reconnect(callback);
      }

      connection.on("error", function (err) {
        if (err.message !== "Connection closing") {
          console.error("[AMQP] conn error", err.message);
        }
      });

      connection.on("close", function () {
        console.error("[AMQP] reconnecting");
        self.channel = null;
        return module.exports.reconnect(callback);
      });

      connection.createChannel(function (err, ch) {
        console.log("connection is reestablished");
        self.channel = ch;
        return callback();
      });
    });
  },

  publish: function (message, callback) {
    var self = this;
    var key = 'test'; //this.generateKey(message);
    var m = Buffer.from(JSON.stringify(message));

    if (self.channel == null) return;

    try {
      self.channel.assertExchange(config.queue.exchange, "topic", {
        durable: false,
        nowait: true,
      });
      self.channel.publish(config.queue.exchange, key, m, { nowait: true });
      return callback();
    } catch (err) {
      pubQueue.push({ key: key, m: m });
      console.log(err);
    }
  },
};

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接