本地主机上RabbitMQ连接突然关闭

3

我们在本地的开发/测试场景中遇到了与RabbitMQ突然断开连接的问题。在我们的开发环境中,每个开发人员的Windows 7机器上都安装了RabbitMQ,并通过Spring AMQP库使用Java客户端进行连接。一切正常工作了一段时间,但在某个时间点连接会断开,RabbitMQ日志中会出现以下消息:

=WARNING REPORT==== 4-Jan-2016::14:39:37 ===
closing AMQP connection <0.3731.0> (127.0.0.1:50792 -> 127.0.0.1:5672):
connection_closed_abruptly

在客户端日志中出现了以下异常:
04/01/2016 14:39:37.181 (AMQP Connection 127.0.0.1:5672) ERROR [CachingConnectionFactory] Channel shutdown: connection error
04/01/2016 14:39:38.188 (SimpleAsyncTaskExecutor-2) WARN  [SimpleMessageListenerContainer] Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:723)
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:713)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:571)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Unrecognized Windows Sockets error: 0: recv failed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:536)
    ... 1 more

在客户端启动时,我们创建了一个动态队列并将多个消费者附加到它上面。当连接断开时,队列将被删除,任何后续尝试重新创建它都会失败,并出现以下异常:
04/01/2016 14:39:38.239 (SimpleAsyncTaskExecutor-8) WARN  [BlockingQueueConsumer] Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[MY_TEST_QU]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:571)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:470)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1165)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:885)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:704)
    at com.sun.proxy.$Proxy77.queueDeclarePassive(Unknown Source)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:550)
    ... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_TEST_QU' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 12 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_TEST_QU' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
    ... 1 more

我们在rabbitmq 3.4.x和3.5.x以及Spring AMQP 1.3.x和1.5.x中遇到了这个问题。有趣的是,在安装了独立服务器的QA和生产环境中从未发生过这种情况。
非常感谢您的帮助。
1个回答

1

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations() 在不再有队列监听时会失败。

这不是队列的重新创建,而只是 PassiveDeclarations。因此,如果您手动删除了“动态队列”,则还需要手动重新创建它。

我的意思是,如果您不遵循 RabbitAdmin Bean 的约定,除非手动重新创建,否则没有选择。在这种情况下,您无需担心监听器:当队列回来时,它将正确地重新连接。

更新

从版本 1.5 开始,Spring AMQP 提供了 ListenerContainerConsumerFailedEvent,该事件恰好在所有 attemptPassiveDeclarations() 尝试之后发出。因此,您可以捕获该 ApplicationEventstop() 您的监听容器,重新声明 queue 并再次启动容器。

如果您的队列不是 Bean,则不会自动重新声明。


我不知道为什么会断开连接,但是@artem说的是,如果将队列添加为一个bean并在上下文中添加RabbitAdmin,则当重新建立连接时,它将自动重新声明队列。请参见参考手册。或者,不要使用自动删除或排他队列。 - Gary Russell
我们使用RabbitAdmin来声明队列,但队列不是一个bean。以下是代码Queue tempQ = new Queue(myQueueName, false, true, true); amqpAdmin.declareQueue(tempQ);,并且此代码在启动时运行。 - user3739116
1
谢谢 @Artem 和 @Gary。 经过一番搜索,看起来在 Windows 7 上到本地主机的连接中断问题与启用了 IPv4 和 IPv6 并且同时拥有java.net.preferIPv4Stack=true有关,这是我们应用程序中其他东西所需要的。当我删除了 java.net.preferIPv4Stack 属性时,连接问题消失了,但我们仍然需要保留该属性。 因此,我们必须忍受连接问题并解决动态队列访问问题。 - user3739116
由于我们的监听器容器是一个bean,编写代码来捕获ApplicationEvent并停止、声明队列并启动有一点复杂。我们将采用@Gary提供的方法,在队列定义中使用auto-delete和exclusive false。 - user3739116

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接