我在尝试弄清楚RabbitMQ和SpringXD之间发生了什么事情,但遗憾的是一夜都没有成功。
问题: SpringXD会反复关闭RabbitMQ连接,或者报告与通道缓存大小相关的警告。
来自SpringXD日志的片段(在流初始化/自动装配期间):
2016-05-03T07:42:43+0200 1.3.0.RELEASE WARN
DeploymentsPathChildrenCache-0 listener.SimpleMessageListenerContainer
- CachingConnectionFactory's channelCacheSize can not be less than the
number of concurrentConsumers so it was reset to match: 4
...
2016-05-03T07:54:17+0200 1.3.0.RELEASE ERROR AMQP Connection
192.168.120.125:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-05-03T17:38:58+0200 1.3.0.RELEASE ERROR AMQP Connection
192.168.120.125:5672 connection.CachingConnectionFactory - Channel shutdown: connection error; protocol method:
method<connection.close>(reply-code=504, reply-text=CHANNEL_ERROR -
second 'channel.open' seen, class-id=20, method-id=10)
来自RabbitMQ日志的片段:
=WARNING REPORT==== 3-May-2016::08:08:09 === closing AMQP connection <0.22276.61> (192.168.120.125:59350 -> 192.168.120.125:5672): client
unexpectedly closed TCP connection
=ERROR REPORT==== 3-May-2016::08:08:11 === closing AMQP connection 0.15409.61> (192.168.120.125:58527 -> 192.168.120.125:5672):
{writer,send_failed,{error,closed}}
状态阻塞错误很少发生
=ERROR REPORT==== 3-May-2016::17:38:58 === Error on AMQP connection <0.20542.25> (192.168.120.125:59421 -> 192.168.120.125:5672, vhost:
'/', user: 'xd', state: blocked), channel 7: operation channel.open
caused a connection exception channel_error: "second 'channel.open'
seen"
我的设置(6个节点)
- springxd 1.3.0 distributed (zookeeper)
- RabbitMQ 3.6.0, Erlang R16B03-1 cluster
ackMode: AUTO ## or NONE
autoBindDLQ: false
backOffInitialInterval: 1000
backOffMaxInterval: 10000
backOffMultiplier: 2.0
batchBufferLimit: 10000
batchingEnabled: false
batchSize: 200
batchTimeout: 5000
compress: false
concurrency: 4
deliveryMode: NON_PERSISTENT ## or PERSISTENT
durableSubscription: false
maxAttempts: 10
maxConcurrency: 10
prefix: xdbus.
prefetch: 1000
replyHeaderPatterns: STANDARD_REPLY_HEADERS,*
republishToDLQ: false
requestHeaderPatterns: STANDARD_REQUEST_HEADERS,*
requeue: true
transacted: false
txSize: 1000
spring: rabbitmq:
addresses:
priv1:5672,priv2:5672,priv3:5672,
priv4:5672,priv5:5672,priv6:5672
adminAddresses:
http://priv1:15672, http://priv2:15672, http://priv3:15672, http://priv4:15672, http://priv5:15672,http://priv6:15672
nodes:
rabbit@priv1,rabbit@priv2,rabbit@priv3,
rabbit@priv4,rabbit@priv5,rabbit@priv6
username: xd
password: xxxx
virtual_host: /
useSSL: false
ha-xdbus策略:
- ^xdbus\. all
- ha-mode: exactly
- ha-params: 2
- queue-master-locator: min-masters
Rabbit配置文件
[
{rabbit,
[
{tcp_listeners, [5672]},
{queue_master_locator, "min-masters"}
]
}
].
当 ackMode 为 NONE 时,会发生以下情况:
最终消费者数量降至零,我拥有了僵尸流,无法从该状态恢复,反过来会导致不必要的排队。
当 ackMode 为 AUTO 时,会发生以下情况:
一些消息永远未被确认。
SpringXD 流和持久队列
Rabbit 模块被用作源或汇,没有自定义自动装配。
典型的流定义如下:
摄入:
event_generator | rabbit --mappedRequestHeaders=XDRoutingKey --routingKey='headers[''XDRoutingKey'']'
处理/接收器:
rabbit --queues='xdbus.INQUEUE-A' | ENRICHMENT-PROCESSOR-A | elastic-sink
rabbit --queues='xdbus.INQUEUE-B' | ENRICHMENT-PROCESSOR-B | elastic-sink
xdbus.INQUEUE-xxx 是通过Rabbit管理界面手动创建的。(持久性)
全局统计信息(来自RabbitMQ管理员)
- 连接数:190
- 通道数:2263(可能存在通道缓存问题?)
- 交换机数:20
- 队列数:120
- 消费者数:1850
最后:
我希望有人能回答一下配置有什么问题(我非常确定网络表现良好,因此没有与最大打开文件限制相关的问题)。
消息速率从每秒2K到最大30K不等,相对负载较小。
谢谢!
Ivan
<xd root>/xd/config/META-INF/spring-xd/bus/rabbit-bus.xml
。它会替换xd-dirt jar中现有的文件-该位置(/xd/config
)较早地位于类路径上,所以dirt将获取总线XML配置的修改版本。 - Gary Russellchannel-cache-size
添加到sink的rabbit.xml中的<rabbit:connection-factory />
。 - Gary Russell