RabbitMQ客户端(DotNet Core)防止应用程序关闭

4

我有一段代码,使用RabbitMQ来管理一系列的作业。因此,我需要与RabbitMQ服务器建立连接和通道,以执行这些作业的相关操作。我使用以下方法将作业加入队列:

    public override void QueueJob(string qid, string jobId) {
        this.VerifyReadyToGo();

        this.CreateQueue(qid);
        byte[] messageBody = Encoding.UTF8.GetBytes(jobId);
        this.channel.BasicPublish(
            exchange: Exchange,
            routingKey: qid,
            body: messageBody,
            basicProperties: null
        );
        OLog.Debug($"Queued job {jobId} on {qid}");
    }

    public override string RetrieveJobID(string qid) {
        this.VerifyReadyToGo();

        this.CreateQueue(qid);

        BasicGetResult data = this.channel.BasicGet(qid, false);
        string jobData = Encoding.UTF8.GetString(data.Body);

        int addCount = 0;
        while (!this.jobWaitingAck.TryAdd(jobData, data.DeliveryTag)) {
            // try again.
            Thread.Sleep(10);
            if (addCount++ > 2) {
                throw new JobReceptionException("Failed to add job to waiting ack list.");
            }
        }
        OLog.Debug($"Found job {jobData} on queue {qid} with ackId {data.DeliveryTag}");
        return jobData;
    }

问题在于像Publish、Get或者Acknowledge这样的方法调用会创建一些后台线程,当通道和连接关闭时这些线程并不会关闭。这意味着测试通过并且操作成功完成,但是当应用程序尝试关闭时它会挂起并且永远无法完成。
以下是参考连接方法:
    public override void Connect() {
        if (this.Connected) {
            return;
        }
        this.factory = new ConnectionFactory {
            HostName = this.config.Hostname,
            Password = this.config.Password,
            UserName = this.config.Username,
            Port = this.config.Port,
            VirtualHost = VirtualHost
        };
        this.connection = this.factory.CreateConnection();
        this.channel = this.connection.CreateModel();
        this.channel.ExchangeDeclare(
            exchange: Exchange,
            type: "direct",
            durable: true
        );
    }

我该如何解决这个问题(RabbitMQ客户端防止应用程序退出)?
1个回答

5

我不知道为什么,但是这个Connect方法的更改使得差别很大:

    public override void Connect() {
        if (this.Connected) {
            return;
        }
        this.factory = new ConnectionFactory {
            HostName = this.config.Hostname,
            Password = this.config.Password,
            UserName = this.config.Username,
            Port = this.config.Port,
            UseBackgroundThreadsForIO = true
        };
        this.connection = this.factory.CreateConnection();
        this.channel = this.connection.CreateModel();
        this.channel.ExchangeDeclare(
            exchange: Exchange,
            type: "direct",
            durable: true
        );
    }

1
“后台线程不会保持托管执行环境运行。”(https://msdn.microsoft.com/en-us/library/h339syd0(v=vs.110).aspx)。猜测您的“UseBackgroundThreadsForIO”更改现在正在执行此操作。;) - Tung
2
我刚刚在这上面浪费了一整天。 为什么UseBackgroundThreadsForIO = true不是默认值? - hex

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接