RabbitMQ - 通道关闭:连接错误(SpringXD反复关闭RabbitMQ连接。)

3

我在尝试弄清楚RabbitMQ和SpringXD之间发生了什么事情,但遗憾的是一夜都没有成功。

问题: SpringXD会反复关闭RabbitMQ连接,或者报告与通道缓存大小相关的警告。

来自SpringXD日志的片段(在流初始化/自动装配期间):

 2016-05-03T07:42:43+0200 1.3.0.RELEASE WARN
 DeploymentsPathChildrenCache-0 listener.SimpleMessageListenerContainer
 - CachingConnectionFactory's channelCacheSize can not be less than the 
 number of concurrentConsumers so it was reset to match: 4

...

 2016-05-03T07:54:17+0200 1.3.0.RELEASE ERROR AMQP Connection
 192.168.120.125:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

 2016-05-03T17:38:58+0200 1.3.0.RELEASE ERROR AMQP Connection
 192.168.120.125:5672 connection.CachingConnectionFactory - Channel shutdown: connection error; protocol method:
 method<connection.close>(reply-code=504, reply-text=CHANNEL_ERROR - 
 second 'channel.open' seen, class-id=20, method-id=10)

来自RabbitMQ日志的片段:

 =WARNING REPORT==== 3-May-2016::08:08:09 === closing AMQP connection <0.22276.61> (192.168.120.125:59350 -> 192.168.120.125:5672): client
 unexpectedly closed TCP connection

 =ERROR REPORT==== 3-May-2016::08:08:11 === closing AMQP connection 0.15409.61> (192.168.120.125:58527 -> 192.168.120.125:5672):
 {writer,send_failed,{error,closed}}

状态阻塞错误很少发生

 =ERROR REPORT==== 3-May-2016::17:38:58 === Error on AMQP connection <0.20542.25> (192.168.120.125:59421 -> 192.168.120.125:5672, vhost:
'/', user: 'xd', state: blocked), channel 7: operation channel.open
caused a connection exception channel_error: "second 'channel.open'
 seen"

我的设置(6个节点)

- springxd 1.3.0 distributed (zookeeper)  
- RabbitMQ 3.6.0, Erlang R16B03-1 cluster


    ackMode:                   AUTO ## or NONE
    autoBindDLQ:               false
    backOffInitialInterval:    1000
    backOffMaxInterval:        10000
    backOffMultiplier:         2.0
    batchBufferLimit:          10000
    batchingEnabled:           false
    batchSize:                 200
    batchTimeout:              5000
    compress:                  false
    concurrency:               4
    deliveryMode:              NON_PERSISTENT ## or PERSISTENT
    durableSubscription:       false
    maxAttempts:               10
    maxConcurrency:            10
    prefix:                    xdbus.
    prefetch:                  1000
    replyHeaderPatterns:       STANDARD_REPLY_HEADERS,*
    republishToDLQ:            false
    requestHeaderPatterns:     STANDARD_REQUEST_HEADERS,*
    requeue:                   true
    transacted:                false
    txSize:                    1000

spring: rabbitmq:

addresses:
priv1:5672,priv2:5672,priv3:5672,
priv4:5672,priv5:5672,priv6:5672

adminAddresses:  
http://priv1:15672, http://priv2:15672, http://priv3:15672, http://priv4:15672, http://priv5:15672,http://priv6:15672

nodes: 
rabbit@priv1,rabbit@priv2,rabbit@priv3,
rabbit@priv4,rabbit@priv5,rabbit@priv6

username: xd
password: xxxx
virtual_host: /
useSSL: false

ha-xdbus策略:

 - ^xdbus\. all  
 - ha-mode: exactly
 - ha-params:   2 
 - queue-master-locator:    min-masters

Rabbit配置文件

[
 {rabbit, 
[
     {tcp_listeners, [5672]},
     {queue_master_locator, "min-masters"}
]
}
].

当 ackMode 为 NONE 时,会发生以下情况:
最终消费者数量降至零,我拥有了僵尸流,无法从该状态恢复,反过来会导致不必要的排队。
当 ackMode 为 AUTO 时,会发生以下情况:
一些消息永远未被确认。
SpringXD 流和持久队列
Rabbit 模块被用作源或汇,没有自定义自动装配。
典型的流定义如下:
摄入:
event_generator | rabbit --mappedRequestHeaders=XDRoutingKey --routingKey='headers[''XDRoutingKey'']'

处理/接收器:

rabbit --queues='xdbus.INQUEUE-A' | ENRICHMENT-PROCESSOR-A | elastic-sink
rabbit --queues='xdbus.INQUEUE-B' | ENRICHMENT-PROCESSOR-B | elastic-sink

xdbus.INQUEUE-xxx 是通过Rabbit管理界面手动创建的。(持久性)

全局统计信息(来自RabbitMQ管理员)

  • 连接数:190
  • 通道数:2263(可能存在通道缓存问题?)
  • 交换机数:20
  • 队列数:120
  • 消费者数:1850

最后:

我希望有人能回答一下配置有什么问题(我非常确定网络表现良好,因此没有与最大打开文件限制相关的问题)。

消息速率从每秒2K到最大30K不等,相对负载较小。

谢谢!

Ivan

1个回答

2
我们曾经看到在高速通道时,这种通道的不稳定性问题与 此类似
解决方法是增加通道缓存大小以避免高频次通道转移。当前不清楚不稳定性出现的位置,但我认为它并不在Spring AMQP 中。
然而,一个问题是 XD 不公开 channelCacheSize 属性。
上面链接中的答案提供了一种方法,通过替换总线配置 XML 文件来添加该属性。增加缓存大小解决了该用户的问题。
我们有一个未解决的 JIRA 问题,以公开该属性
如上所述,您需要将其放在xd配置目录下:xd/config/META-INF/spring-xd/bus/rabbit-bus.xml编辑: 可以使用总线扩展机制的技术...
$ cat xd/config/META-INF/spring-xd/bus/ext/cf.xml 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg ref="rabbitFactory" />
        <property name="addresses" value="${spring.rabbitmq.addresses}" />
        <property name="username" value="${spring.rabbitmq.username}" />
        <property name="password" value="${spring.rabbitmq.password}" />
        <property name="virtualHost" value="${spring.rabbitmq.virtual_host}" />
        <property name="channelCacheSize" value="${spring.rabbitmq.channelCacheSize:100}" />
    </bean>

</beans>

编辑:测试结果

使用1百万条消息预填充队列foo

    concurrency:               10
    prefetch:                  1000
    txSize:                    1000

.

xd:>stream create foo --definition "rin:rabbit --concurrency=10 --maxConcurrency=10 --prefetch=1000 --txSize=1000 | t1:transform | t2:transform | rout:rabbit --routingKey='''bar'''" --deploy
Created and deployed new stream 'foo'

因此,通过这个配置,我们最终得到了40个消费者。

我从总线上看到的发布通道从未超过29个,其中有10个发布者为接收器。

在不到5分钟的时间内,从foo传输到bar的消息数量达到了1m(通过xdbus.foo.0xdbus.foo.1xdbus.foo.2),共发布了4m条消息。

没有错误 - 但我的笔记本需要冷却一下 :D


问题是xd配置目录在哪里。($ SPRING-XD-ROOT / xd / config?)但我尝试过这个,但没有帮助。
if(this.getConnectionFactory()instanceof CachingConnectionFactory){   CachingConnectionFactory cf =(CachingConnectionFactory)getConnectionFactory();      if(cf.getCacheMode()== CacheMode.CHANNEL && cf.getChannelCacheSize()<this.concurrentConsumers){        cf.setChannelCacheSize(200);        /* cf.setChannelCacheSize(this.concurrentConsumers); * /
- Ivan Prostran
不行,你不能这样使用逻辑;你需要将答案中的准确文件放置在我上面描述的准确位置 - <xd root>/xd/config/META-INF/spring-xd/bus/rabbit-bus.xml。它会替换xd-dirt jar中现有的文件-该位置(/xd/config)较早地位于类路径上,所以dirt将获取总线XML配置的修改版本。 - Gary Russell
请注意,这将增加rabbit消息总线的缓存大小;对于rabbit sink,您需要将channel-cache-size添加到sink的rabbit.xml中的<rabbit:connection-factory /> - Gary Russell
我想知道为什么兔子下沉模块需要像你之前说的那样“channel-cache-size”,而兔子源则不需要。如果我没记错,两者都注入了rabbitConnectionFactory引用,而该引用是CachingConnectionFactory实例。无论如何,非常感谢您的帮助。当然,测试完成后我会通知您的。 - Ivan Prostran
我明白,但仍有一些配置不正确。请检查此命令 --> cat container-1-start.log | grep "com.rabbitmq.client.ShutdownSignalException: connection error" | wc -l 这只是集群中6个节点之一的结果,共计571个。 - Ivan Prostran
显示剩余19条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接