Spring Integration:TCP客户端/服务器每个服务器连接打开一个客户端连接

3
我正在尝试使用Spring Integration实现TCP客户端/服务器应用程序,其中我需要为每个传入的TCP服务器连接打开一个TCP客户端套接字。
基本上,我有一堆IoT设备通过原始TCP套接字与后端服务器通信。我需要在系统中实现额外的功能。但是设备和服务器上的软件都是闭源的,因此我无法对它们进行任何更改。所以我的想法是在设备和服务器之间放置中间件来拦截这个客户端/服务器通信并提供附加功能。
我正在使用TcpNioServerConnectionFactory和TcpNioClientConnectionFactory与入站/出站通道适配器向所有方发送/接收消息。但是,在消息结构中没有任何信息将消息绑定到特定的设备;因此,每当新设备上的新连接进入服务器套接字时,我必须打开到后端的新客户端套接字。此客户端连接必须绑定到该特定服务器套接字的生命周期。它决不能被重复使用,如果这个客户端套接字(从后端到中间件)由于任何原因终止,则服务器套接字(从中间件到设备)也必须关闭。我该怎么做呢?
编辑:我最初的想法是子类化AbstractClientConnectionFactory,但它似乎除了在请求时提供客户端连接之外什么也没做。我是否应该查找子类化入站/出站通道适配器或其他地方?我还应该提到,我也可以接受非Spring集成解决方案,例如Apache Camel,甚至是使用原始NIO套接字的自定义解决方案。
编辑2:当我切换到TcpNetServerConnectionFactory并使用ThreadAffinityClientConnectionFactory包装客户端工厂时,我完成了一半,并且设备可以正常访问后端。但是,当后端发送回复时,我会收到错误“无法为GenericMessage找到出站套接字”,然后客户端套接字就会终止。我认为这是因为后端没有必要的头文件来正确路由消息。我该如何获取此信息?我的配置类如下:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class ServerConfiguration {

    @Bean
    public AbstractServerConnectionFactory serverFactory() {
        AbstractServerConnectionFactory factory = new TcpNetServerConnectionFactory(8000);
        factory.setSerializer(new MapJsonSerializer());
        factory.setDeserializer(new MapJsonSerializer());
        return factory;
    }

    @Bean
    public AbstractClientConnectionFactory clientFactory() {
        AbstractClientConnectionFactory factory = new TcpNioClientConnectionFactory("localhost", 3333);
        factory.setSerializer(new MapJsonSerializer());
        factory.setDeserializer(new MapJsonSerializer());
        factory.setSingleUse(true);
        return new ThreadAffinityClientConnectionFactory(factory);
    }

    @Bean
    public TcpReceivingChannelAdapter inboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
        inbound.setConnectionFactory(connectionFactory);
        return inbound;
    }

    @Bean
    public TcpSendingMessageHandler outboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
        TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
        outbound.setConnectionFactory(connectionFactory);
        return outbound;
    }

    @Bean
    public TcpReceivingChannelAdapter inboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
        inbound.setConnectionFactory(connectionFactory);
        return inbound;
    }

    @Bean
    public TcpSendingMessageHandler outboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
        TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
        outbound.setConnectionFactory(connectionFactory);
        return outbound;
    }

    @Bean
    public IntegrationFlow backendIntegrationFlow() {
        return IntegrationFlows.from(inboundBackendAdapter(clientFactory()))
                .log(LoggingHandler.Level.INFO)
                .handle(outboundDeviceAdapter(serverFactory()))
                .get();
    }

    @Bean
    public IntegrationFlow deviceIntegrationFlow() {
        return IntegrationFlows.from(inboundDeviceAdapter(serverFactory()))
                .log(LoggingHandler.Level.INFO)
                .handle(outboundBackendAdapter(clientFactory()))
                .get();
    }
}
1个回答

3

不是很清楚你的问题,我假设你想要在客户端和服务器之间使用Spring集成代理。类似于:

iot-device -> spring server -> message-transformation -> spring client -> back-end-server

如果是这种情况,您可以实现一个包装标准工厂的ClientConnectionIdAware客户端连接工厂。
在集成流中,将消息中传入的ip_connectionId头绑定到线程上(使用ThreadLocal)。
然后,在客户端连接工厂中,使用ThreadLocal值在Map中查找相应的输出连接;如果未找到(或已关闭),则创建一个新连接并将其存储在Map中以备将来重用。
实现一个ApplictionListener(或@EventListener)来监听从服务器连接工厂发出的TcpConnectionCloseEvent,并关闭相应的出站连接。
这听起来像是一个很棒的增强功能,所以请考虑将其贡献回框架。
编辑:
版本5.0添加了ThreadAffinityClientConnectionFactory,它将与TcpNetServerConnectionFactory一起开箱即用,因为每个连接都有自己的线程。
对于TcpNioServerConnectionFactory,您需要额外的逻辑来为每个请求动态绑定连接到线程。
@SpringBootApplication
public class So51200675Application {

    public static void main(String[] args) {
        SpringApplication.run(So51200675Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
            socket.getOutputStream().write("foo\r\n".getBytes());
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println(reader.readLine());
            socket.close();
        };
    }

    @Bean
    public Map<String, String> fromToConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public Map<String, String> toFromConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public IntegrationFlow proxyInboundFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
                .transform(Transformers.objectToString())
                .<String, String>transform(s -> s.toUpperCase())
                .handle((p, h) -> {
                    mapConnectionIds(h);
                    return p;
                })
                .handle(Tcp.outboundAdapter(threadConnectionFactory()))
                .get();
    }

    @Bean
    public IntegrationFlow proxyOutboundFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
                .transform(Transformers.objectToString())
                .<String, String>transform(s -> s.toUpperCase())
                .enrichHeaders(e -> e
                        .headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
                                + IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
                .handle(Tcp.outboundAdapter(serverFactory()))
                .get();
    }

    private void mapConnectionIds(Map<String, Object> h) {
        try {
            TcpConnection connection = threadConnectionFactory().getConnection();
            String mapping = toFromConnectionMappings().get(connection.getConnectionId());
            String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
            if (mapping == null || !(mapping.equals(incomingCID))) {
                System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
                toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
                fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Bean
    public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
        return new ThreadAffinityClientConnectionFactory(clientFactory()) {

            @Override
            public boolean isSingleUse() {
                return false;
            }

        };
    }

    @Bean
    public AbstractServerConnectionFactory serverFactory() {
        return Tcp.netServer(1234).get();
    }

    @Bean
    public AbstractClientConnectionFactory clientFactory() {
        AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235).get();
        clientFactory.setSingleUse(true);
        return clientFactory;
    }

    @Bean
    public IntegrationFlow serverFlow() {
        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)))
                .transform(Transformers.objectToString())
                .<String, String>transform(p -> p + p)
                .get();
    }

    @Bean
    public ApplicationListener<TcpConnectionCloseEvent> closer() {
        return e -> {
            if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
                String key = fromToConnectionMappings().remove(e.getConnectionId());
                toFromConnectionMappings().remove(key);
                System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
                threadConnectionFactory().releaseConnection();
            }
        };
    }

}

编辑3

对于我来说,使用MapJsonSerializer没有问题。

@SpringBootApplication
public class So51200675Application {

    public static void main(String[] args) {
        SpringApplication.run(So51200675Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
            socket.getOutputStream().write("{\"foo\":\"bar\"}\n".getBytes());
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println(reader.readLine());
            socket.close();
        };
    }

    @Bean
    public Map<String, String> fromToConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public Map<String, String> toFromConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public MapJsonSerializer serializer() {
        return new MapJsonSerializer();
    }

    @Bean
    public IntegrationFlow proxyRequestFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
                .<Map<String, String>, Map<String, String>>transform(m -> {
                    m.put("foo", m.get("foo").toUpperCase());
                    return m;
                })
                .handle((p, h) -> {
                    mapConnectionIds(h);
                    return p;
                })
                .handle(Tcp.outboundAdapter(threadConnectionFactory()))
                .get();
    }

    @Bean
    public IntegrationFlow proxyReplyFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
                .<Map<String, String>, Map<String, String>>transform(m -> {
                    m.put("foo", m.get("foo").toLowerCase() + m.get("foo"));
                    return m;
                })
                .enrichHeaders(e -> e
                        .headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
                                + IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
                .handle(Tcp.outboundAdapter(serverFactory()))
                .get();
    }

    private void mapConnectionIds(Map<String, Object> h) {
        try {
            TcpConnection connection = threadConnectionFactory().getConnection();
            String mapping = toFromConnectionMappings().get(connection.getConnectionId());
            String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
            if (mapping == null || !(mapping.equals(incomingCID))) {
                System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
                toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
                fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Bean
    public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
        return new ThreadAffinityClientConnectionFactory(clientFactory()) {

            @Override
            public boolean isSingleUse() {
                return false;
            }

        };
    }

    @Bean
    public AbstractServerConnectionFactory serverFactory() {
        return Tcp.netServer(1234)
                .serializer(serializer())
                .deserializer(serializer())
                .get();
    }

    @Bean
    public AbstractClientConnectionFactory clientFactory() {
        AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235)
                .serializer(serializer())
                .deserializer(serializer())
                .get();
        clientFactory.setSingleUse(true);
        return clientFactory;
    }

    @Bean
    public IntegrationFlow backEndEmulatorFlow() {
        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)
                    .serializer(serializer())
                    .deserializer(serializer())))
                .<Map<String, String>, Map<String, String>>transform(m -> {
                    m.put("foo", m.get("foo") + m.get("foo"));
                    return m;
                })
                .get();
    }

    @Bean
    public ApplicationListener<TcpConnectionCloseEvent> closer() {
        return e -> {
            if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
                String key = fromToConnectionMappings().remove(e.getConnectionId());
                toFromConnectionMappings().remove(key);
                System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
                threadConnectionFactory().releaseConnection();
            }
        };
    }

}

添加新的映射 localhost:56998:1234:55c822a4-4252-45e6-9ef2-79263391f4be 至 localhost:1235:56999:3d520ca9-2f3a-44c3-b05f-e59695b8c1b0 {"foo":"barbarBARBAR"} 移除映射 localhost:56998:1234:55c822a4-4252-45e6-9ef2-79263391f4be 至 localhost:1235:56999:3d520ca9-2f3a-44c3-b05f-e59695b8c1b0


谢谢你的回答。是的,那正是我想表达的意思。我在想你是否是指 TcpNetClientConnectionFactory 而不是 TcpNetServerConnectionFactory - ozg
1
不,我的意思是如果入站工厂是“Net”,那么在ThreadAffinityClientConnectionFactory上将起作用,因为每个连接都有自己的读取器线程。对于(入站)NIO工厂,请求可能会在不同的线程上接收,因此需要一个增强的客户端工厂来处理该场景。如果不清楚,我可以明天组织一个PoC。 - Gary Russell
对于请求/响应消息传递,请使用网关而不是通道适配器,并且所有的关联处理将由网关自动完成。TcpInboundGatewayTcpOutboundGateway - Gary Russell
我选择了适配器对,因为它不总是请求/回复消息——客户端和服务器发送和接收消息而不期望回复。 - ozg
对我来说它运行得很好,你所说的“后端服务器无法监听1235端口”是什么意思?serverFlow只是为了模拟你真正的后端。 - Gary Russell
显示剩余11条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接