我正在使用“HornetQConfigurationCustomizer”来自定义嵌入式HornetQ服务器,因为默认情况下它只带有“InVMConnectorFactory”。
我已经创建了一些示例应用程序,说明了这种设置,在这个示例中,“ServerSend”表示将生成消息的服务器,“ServerReceive”表示将消费消息的服务器。
两个应用程序的pom.xml都包含:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-jms-server</artifactId>
</dependency>
DemoHornetqServerSendApplication:
@SpringBootApplication
@EnableScheduling
public class DemoHornetqServerSendApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${spring.hornetq.embedded.queues}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqServerSendApplication.class, args);
}
@Scheduled(fixedRate = 5000)
private void sendMessage() {
String message = "Timestamp from Server: " + System.currentTimeMillis();
System.out.println("Sending message: " + message);
jmsTemplate.convertAndSend(testQueue, message);
}
@Bean
public HornetQConfigurationCustomizer hornetCustomizer() {
return new HornetQConfigurationCustomizer() {
@Override
public void customize(Configuration configuration) {
String serverSendConnectorName = "server-send-connector";
String serverReceiveConnectorName = "server-receive-connector";
Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5445");
TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverSendConnectorName, tc);
Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
acceptors.add(tc);
params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5446");
tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverReceiveConnectorName, tc);
List<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(serverReceiveConnectorName);
ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
"my-cluster", // name
"jms", // address
serverSendConnectorName, // connector name
500, // retry interval
true, // duplicate detection
true, // forward when no consumers
1, // max hops
1000000, // confirmation window size
staticConnectors,
true // allow direct connections only
);
configuration.getClusterConfigurations().add(conf);
AddressSettings setting = new AddressSettings();
setting.setRedistributionDelay(0);
configuration.getAddressesSettings().put("#", setting);
}
};
}
}
应用程序属性文件(ServerSend):
spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password
DemoHornetqServerReceiveApplication:
@SpringBootApplication
@EnableJms
public class DemoHornetqServerReceiveApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${spring.hornetq.embedded.queues}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqServerReceiveApplication.class, args);
}
@JmsListener(destination="${spring.hornetq.embedded.queues}")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
@Bean
public HornetQConfigurationCustomizer hornetCustomizer() {
return new HornetQConfigurationCustomizer() {
@Override
public void customize(Configuration configuration) {
String serverSendConnectorName = "server-send-connector";
String serverReceiveConnectorName = "server-receive-connector";
Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5446");
TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverReceiveConnectorName, tc);
Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
acceptors.add(tc);
params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5445");
tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverSendConnectorName, tc);
List<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(serverSendConnectorName);
ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
"my-cluster", // name
"jms", // address
serverReceiveConnectorName, // connector name
500, // retry interval
true, // duplicate detection
true, // forward when no consumers
1, // max hops
1000000, // confirmation window size
staticConnectors,
true // allow direct connections only
);
configuration.getClusterConfigurations().add(conf);
AddressSettings setting = new AddressSettings();
setting.setRedistributionDelay(0);
configuration.getAddressesSettings().put("#", setting);
}
};
}
}
应用程序属性(ServerReceive):
spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password
启动两个应用程序后,日志输出显示如下内容:
ServerSend:
2015-04-09 11:11:58.471 INFO 7536 --- [ main] org.hornetq.core.server : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:\Users\****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)
2015-04-09 11:11:58.501 INFO 7536 --- [ main] org.hornetq.core.server : HQ221045: libaio is not available, switching the configuration into NIO
2015-04-09 11:11:58.595 INFO 7536 --- [ main] org.hornetq.core.server : HQ221043: Adding protocol support CORE
2015-04-09 11:11:58.720 INFO 7536 --- [ main] org.hornetq.core.server : HQ221003: trying to deploy queue jms.queue.jms.testqueue
2015-04-09 11:11:59.568 INFO 7536 --- [ main] org.hornetq.core.server : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5445
2015-04-09 11:11:59.593 INFO 7536 --- [ main] org.hornetq.core.server : HQ221007: Server is now live
2015-04-09 11:11:59.593 INFO 7536 --- [ main] org.hornetq.core.server : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]
服务器接收:
2015-04-09 11:12:04.401 INFO 4528 --- [ main] org.hornetq.core.server : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:\Users\****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)
2015-04-09 11:12:04.410 INFO 4528 --- [ main] org.hornetq.core.server : HQ221045: libaio is not available, switching the configuration into NIO
2015-04-09 11:12:04.520 INFO 4528 --- [ main] org.hornetq.core.server : HQ221043: Adding protocol support CORE
2015-04-09 11:12:04.629 INFO 4528 --- [ main] org.hornetq.core.server : HQ221003: trying to deploy queue jms.queue.jms.testqueue
2015-04-09 11:12:05.545 INFO 4528 --- [ main] org.hornetq.core.server : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5446
2015-04-09 11:12:05.578 INFO 4528 --- [ main] org.hornetq.core.server : HQ221007: Server is now live
2015-04-09 11:12:05.578 INFO 4528 --- [ main] org.hornetq.core.server : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]
我在两个输出中看到了 clustered=true
,如果我从 HornetQConfigurationCustomizer
中移除了集群配置,它将显示为 false
,因此它必须具有某种作用。
现在,ServerSend
在控制台输出中显示了这个:
Sending message: Timestamp from Server: 1428574324910
Sending message: Timestamp from Server: 1428574329899
Sending message: Timestamp from Server: 1428574334904
然而,ServerReceive 没有显示任何内容。
从 ServerSend 到 ServerReceive 没有转发消息。
我进行了更多的测试,创建了两个进一步的 Spring Boot 应用程序(ClientSend 和 ClientReceive),它们 没有 嵌入 HornetQ 服务器,而是连接到一个“本地”服务器。
这两个客户端应用程序的 pom.xml 都包含以下内容:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
DemoHornetqClientSendApplication:
@SpringBootApplication
@EnableScheduling
public class DemoHornetqClientSendApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${queue}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqClientSendApplication.class, args);
}
@Scheduled(fixedRate = 5000)
private void sendMessage() {
String message = "Timestamp from Client: " + System.currentTimeMillis();
System.out.println("Sending message: " + message);
jmsTemplate.convertAndSend(testQueue, message);
}
}
应用程序属性 (ClientSend):
spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5446
queue=jms.testqueue
DemoHornetqClientReceiveApplication:
@SpringBootApplication
@EnableJms
public class DemoHornetqClientReceiveApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${queue}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqClientReceiveApplication.class, args);
}
@JmsListener(destination="${queue}")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
应用程序属性文件(ClientReceive):
spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5445
queue=jms.testqueue
现在控制台显示如下内容:
ServerReceive:
Received message: Timestamp from Client: 1428574966630
Received message: Timestamp from Client: 1428574971600
Received message: Timestamp from Client: 1428574976595
客户端接收:
Received message: Timestamp from Server: 1428574969436
Received message: Timestamp from Server: 1428574974438
Received message: Timestamp from Server: 1428574979446
如果我运行了 ServerSend
一段时间,然后启动 ClientReceive
,它也会接收到到目前为止排队的所有消息,这表明消息并不是消失在某个地方或从别处被消耗掉。
为了完整起见,我还将 ClientSend
指向 ServerSend
,ClientReceive
指向 ServerReceive
,以查看聚集和 InVM 客户端是否存在问题,但是没有任何输出表明任何消息已在 ClientReceive
或 ServerReceive
中接收到。
因此,似乎嵌入式代理与直接连接的外部客户端之间的消息传递正常,但是代理之间没有转发任何消息。所以,经过这一切,重要的问题是,设置有什么问题导致消息不能在群集中转发?