我正在尝试使用Spring Integration实现TCP客户端/服务器应用程序,其中我需要为每个传入的TCP服务器连接打开一个TCP客户端套接字。
基本上,我有一堆IoT设备通过原始TCP套接字与后端服务器通信。我需要在系统中实现额外的功能。但是设备和服务器上的软件都是闭源的,因此我无法对它们进行任何更改。所以我的想法是在设备和服务器之间放置中间件来拦截这个客户端/服务器通信并提供附加功能。
我正在使用TcpNioServerConnectionFactory和TcpNioClientConnectionFactory与入站/出站通道适配器向所有方发送/接收消息。但是,在消息结构中没有任何信息将消息绑定到特定的设备;因此,每当新设备上的新连接进入服务器套接字时,我必须打开到后端的新客户端套接字。此客户端连接必须绑定到该特定服务器套接字的生命周期。它决不能被重复使用,如果这个客户端套接字(从后端到中间件)由于任何原因终止,则服务器套接字(从中间件到设备)也必须关闭。我该怎么做呢?
编辑:我最初的想法是子类化AbstractClientConnectionFactory,但它似乎除了在请求时提供客户端连接之外什么也没做。我是否应该查找子类化入站/出站通道适配器或其他地方?我还应该提到,我也可以接受非Spring集成解决方案,例如Apache Camel,甚至是使用原始NIO套接字的自定义解决方案。
编辑2:当我切换到TcpNetServerConnectionFactory并使用ThreadAffinityClientConnectionFactory包装客户端工厂时,我完成了一半,并且设备可以正常访问后端。但是,当后端发送回复时,我会收到错误“无法为GenericMessage找到出站套接字”,然后客户端套接字就会终止。我认为这是因为后端没有必要的头文件来正确路由消息。我该如何获取此信息?我的配置类如下:
基本上,我有一堆IoT设备通过原始TCP套接字与后端服务器通信。我需要在系统中实现额外的功能。但是设备和服务器上的软件都是闭源的,因此我无法对它们进行任何更改。所以我的想法是在设备和服务器之间放置中间件来拦截这个客户端/服务器通信并提供附加功能。
我正在使用TcpNioServerConnectionFactory和TcpNioClientConnectionFactory与入站/出站通道适配器向所有方发送/接收消息。但是,在消息结构中没有任何信息将消息绑定到特定的设备;因此,每当新设备上的新连接进入服务器套接字时,我必须打开到后端的新客户端套接字。此客户端连接必须绑定到该特定服务器套接字的生命周期。它决不能被重复使用,如果这个客户端套接字(从后端到中间件)由于任何原因终止,则服务器套接字(从中间件到设备)也必须关闭。我该怎么做呢?
编辑:我最初的想法是子类化AbstractClientConnectionFactory,但它似乎除了在请求时提供客户端连接之外什么也没做。我是否应该查找子类化入站/出站通道适配器或其他地方?我还应该提到,我也可以接受非Spring集成解决方案,例如Apache Camel,甚至是使用原始NIO套接字的自定义解决方案。
编辑2:当我切换到TcpNetServerConnectionFactory并使用ThreadAffinityClientConnectionFactory包装客户端工厂时,我完成了一半,并且设备可以正常访问后端。但是,当后端发送回复时,我会收到错误“无法为GenericMessage找到出站套接字”,然后客户端套接字就会终止。我认为这是因为后端没有必要的头文件来正确路由消息。我该如何获取此信息?我的配置类如下:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class ServerConfiguration {
@Bean
public AbstractServerConnectionFactory serverFactory() {
AbstractServerConnectionFactory factory = new TcpNetServerConnectionFactory(8000);
factory.setSerializer(new MapJsonSerializer());
factory.setDeserializer(new MapJsonSerializer());
return factory;
}
@Bean
public AbstractClientConnectionFactory clientFactory() {
AbstractClientConnectionFactory factory = new TcpNioClientConnectionFactory("localhost", 3333);
factory.setSerializer(new MapJsonSerializer());
factory.setDeserializer(new MapJsonSerializer());
factory.setSingleUse(true);
return new ThreadAffinityClientConnectionFactory(factory);
}
@Bean
public TcpReceivingChannelAdapter inboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
inbound.setConnectionFactory(connectionFactory);
return inbound;
}
@Bean
public TcpSendingMessageHandler outboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
outbound.setConnectionFactory(connectionFactory);
return outbound;
}
@Bean
public TcpReceivingChannelAdapter inboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
inbound.setConnectionFactory(connectionFactory);
return inbound;
}
@Bean
public TcpSendingMessageHandler outboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
outbound.setConnectionFactory(connectionFactory);
return outbound;
}
@Bean
public IntegrationFlow backendIntegrationFlow() {
return IntegrationFlows.from(inboundBackendAdapter(clientFactory()))
.log(LoggingHandler.Level.INFO)
.handle(outboundDeviceAdapter(serverFactory()))
.get();
}
@Bean
public IntegrationFlow deviceIntegrationFlow() {
return IntegrationFlows.from(inboundDeviceAdapter(serverFactory()))
.log(LoggingHandler.Level.INFO)
.handle(outboundBackendAdapter(clientFactory()))
.get();
}
}
TcpNetClientConnectionFactory
而不是TcpNetServerConnectionFactory
? - ozgTcpInboundGateway
和TcpOutboundGateway
。 - Gary RussellserverFlow
只是为了模拟你真正的后端。 - Gary Russell