RabbitMQ 错误:[Errno 10054] 远程主机强制关闭了一个现有的连接

22

我正在使用Python中的Kombu来消费一个持久化的RabbitMQ队列。

在Windows上,只有一个消费者在消费队列。该消费者出现以下错误:

Traceback (most recent call last):
  File ".\consumer_windows.py", line 66, in <module>
    message.ack()
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\kombu\message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\method_framing.py", line 221, in write_method
    write_frame(1, channel, payload)
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\transport.py", line 182, in write_frame
    frame_type, channel, size, payload, 0xce,
  File "C:\Users\Administrator\Anaconda2\lib\socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 10054] An existing connection was forcibly closed by the remote host

队列最多同时有500条消息。每条消息的大小很小,但是它是一个任务,需要花费长达10分钟的时间才能完成(通常每个消息需要少于5分钟)。

我尝试重新启动消费者、RabbitMQ服务器以及删除队列,但是错误仍然存在。

我看到了 这个问题,但是答案是来自2010年,我的rabbitmq.log中有不同的条目:

=ERROR REPORT==== 24-Apr-2016::08:26:20 ===
closing AMQP connection <0.6716.384> (192.168.X.X:59602 -> 192.168.Y.X:5672):
{writer,send_failed,{error,timeout}}

在 rabbitmq-sasl.log 中没有最近的事件。

为什么会出现这个错误,如何防止它发生?


2
你找到解决方案了吗? - Bob Jordan
很抱歉,@bmjjr不行。 - Greg
同时寻求解决方案。 - Saw-mon and Natalie
我遇到了类似的问题,仍在寻找解决方案。有人有解决方案吗? - Rabbi hasan
这个回答是否解决了你的问题?python:[Errno 10054] 远程主机强制关闭了一个现有的连接 - shahjapan
3个回答

1

我仍在寻找答案。同时,我重新启动与我的Rabbit服务器的连接:

while True:
    try:
​
        connection = pika.BlockingConnection(params)
        channel = connection.channel() # start a channel
        channel.queue_declare(queue=amqp_q, durable=True) # Declare a queue
        ...
​
    except pika.exceptions.ConnectionClosed:
        print('connection closed... and restarted')

0

我曾经遇到过MySQL服务器的同样问题,它是托管在...... 我明白了,如果我们长时间打开连接或长时间不修改,就会发生这种情况。 如果您的程序一直打开数据库或其他内容,直到整个程序运行,请以这样的方式打开数据库,写入所有内容并关闭并重复执行。

我不知道rabbitmq到底是什么,但我认为您所写的错误标题可能是由于这个原因引起的。


0

我使用纯PIKA库尝试通过Amazon MQ连接到Rabbitmq代理时,遇到了相同的错误。

当正确设置ssl配置时,问题得到解决。

请在此处检查完整的博客文章:https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-rabbitmq-pika.html

我使用的核心代码片段:

定义Pika客户端:

import ssl
import pika

class BasicPikaClient:

    def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region):

        # SSL Context for TLS configuration of Amazon MQ for RabbitMQ
        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA')

        url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671"
        parameters = pika.URLParameters(url)
        parameters.ssl_options = pika.SSLOptions(context=ssl_context)

        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()

生产者:

from basicClient import BasicPikaClient

class BasicMessageSender(BasicPikaClient):

    def declare_queue(self, queue_name, durable):
        print(f"Trying to declare queue({queue_name})...")
        self.channel.queue_declare(queue=queue_name, durable=durable)

    def send_message(self, exchange, routing_key, body):
        channel = self.connection.channel()
        channel.basic_publish(exchange=exchange,
                              routing_key=routing_key,
                              body=body)
        print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}")

    def close(self):
        self.channel.close()
        self.connection.close()

调用生产者:

# Initialize Basic Message Sender which creates a connection
# and channel for sending messages.
basic_message_sender = BasicMessageSender(
    credentials["broker_id"],
    credentials["username"],
    credentials['password'],
    credentials['region']
)

# Declare a queue
basic_message_sender.declare_queue("q_name", durable=True)

# Send a message to the queue.
basic_message_sender.send_message(exchange="", routing_key="q_name", body=b'Hello World 2!')

# Close connections.
basic_message_sender.close()

定义消费者:

class BasicMessageReceiver(BasicPikaClient):

    def get_message(self, queue):
        method_frame, header_frame, body = self.channel.basic_get(queue)
        if method_frame:
            print(method_frame, header_frame, body)
            self.channel.basic_ack(method_frame.delivery_tag)
            return method_frame, header_frame, body
        else:
            print('No message returned')

    def close(self):
        self.channel.close()
        self.connection.close()

调用消费者:

# Create Basic Message Receiver which creates a connection
# and channel for consuming messages.
basic_message_receiver = BasicMessageReceiver(
    credentials["broker_id"],
    credentials["username"],
    credentials['password'],
    credentials['region']
)

# Consume the message that was sent.
basic_message_receiver.get_message("q_name")

# Close connections.
basic_message_receiver.close()

希望以上内容有所帮助。 谢谢


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接