Spring Integration TCP客户端多连接

3

我使用Spring Integration的tcp-outbound-adapter和tcp-inbound-adapter通过TCP与第三方外部系统进行通信。

我使用的连接工厂类型为“客户端”,single-use="false",因为与外部系统的通信性质是数十个请求和响应的会话。 外部系统希望我会为每个会话打开一个新的TCP连接。

有没有办法使用Spring Integration实现这一点?

我的代码已成功地为一个这样的会话使用SI。但我希望我的系统能够打开几个这样的连接,以便处理多个并发会话。 目前,如果我向入站适配器发送新会话消息,则使用相同的TCP连接。

请帮忙解决。

更新:

当我们执行超过4个并发请求时,使用Gary给出的ThreadAffinity解决方案会出现此异常。任何想法为什么会这样?

11:08:02.083  [pool-1-thread-2] 193.xxx.yyy.zz:443:55729:46c71372-5933-4707-a27b-93cc4bf78c59 Message sent GenericMessage [payload=byte[326], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2fb866, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2fb866, ip_tcp_remotePort=55718, ip_connectionId=127.0.0.1:55718:4444:7f71ce96-eaac-4b21-8b2c-bf736102f818, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=2dc3e330-d703-8a61-c46c-012233cadf6f, ip_hostname=127.0.0.1, timestamp=1481706480700}]
11:08:12.093  [pool-1-thread-2] Remote Timeout on 193.xxx.yyy.zz:443:55729:46c71372-5933-4707-a27b-93cc4bf78c59
11:08:12.093  [pool-1-thread-2] Tcp Gateway exception
org.springframework.integration.MessageTimeoutException: Timed out waiting for response
            at org.springframework.integration.ip.tcp.TcpOutboundGateway.handleRequestMessage(TcpOutboundGateway.java:146)
            at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
            at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
            at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
            at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
            at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
            at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
            at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
            at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
            at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
            at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
            at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
            at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292)
            at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212)
            at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129)
            at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
            at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
            at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
            at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
            at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
            at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
            at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
            at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
            at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150)
            at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45)
            at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42)
            at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
            at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:441)
            at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:409)
            at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:120)
            at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:98)
            at org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport.onMessage(TcpConnectionInterceptorSupport.java:159)
            at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182)
            at org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport.run(TcpConnectionInterceptorSupport.java:111)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

1个回答

1
这取决于什么构成了“会话” - 如果客户端的所有请求都在单个线程上运行,您可以编写一个简单的连接工厂包装器,将连接存储在ThreadLocal中。您需要一些机制在最后一个请求之后调用工厂包装器来关闭连接并将其从ThreadLocal中删除。
如果会话的请求可以在多个线程上发生,那么它会变得有点复杂,但您仍然可以使用映射到连接实例的ThreadLocal来完成它。
编辑
这里是一个示例...
@SpringBootApplication
public class So40507731Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So40507731Application.class, args);
        MessageChannel channel = context.getBean("clientFlow.input", MessageChannel.class);
        MessagingTemplate template = new MessagingTemplate(channel);
        ThreadAffinityClientConnectionFactory affinityCF = context.getBean(ThreadAffinityClientConnectionFactory.class);
        ExecutorService exec = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(2);
        exec.execute(() -> {
            String result = new String(template.convertSendAndReceive("foo", byte[].class));
            System.out.println(Thread.currentThread().getName() + " " + result);
            result = new String(template.convertSendAndReceive("foo", byte[].class));
            System.out.println(Thread.currentThread().getName() + " " + result);
            affinityCF.release();
            latch.countDown();
        });
        exec.execute(() -> {
            String result = new String(template.convertSendAndReceive("foo", byte[].class));
            System.out.println(Thread.currentThread().getName() + " " + result);
            result = new String(template.convertSendAndReceive("foo", byte[].class));
            System.out.println(Thread.currentThread().getName() + " " + result);
            affinityCF.release();
            latch.countDown();
        });
        latch.await(10, TimeUnit.SECONDS);
        context.close();
        exec.shutdownNow();
    }

    @Bean
    public TcpNetClientConnectionFactory delegateCF() {
        TcpNetClientConnectionFactory clientCF = new TcpNetClientConnectionFactory("localhost", 1234);
        clientCF.setSingleUse(true); // so each thread gets his own connection
        return clientCF;
    }

    @Bean
    public ThreadAffinityClientConnectionFactory affinityCF() {
        return new ThreadAffinityClientConnectionFactory(delegateCF());
    }

    @Bean
    public TcpOutboundGateway outGate() {
        TcpOutboundGateway outGate = new TcpOutboundGateway();
        outGate.setConnectionFactory(affinityCF());
        return outGate;
    }

    @Bean
    public IntegrationFlow clientFlow() {
        return f -> f.handle(outGate());
    }

    @Bean
    public TcpNetServerConnectionFactory serverCF() {
        return new TcpNetServerConnectionFactory(1234);
    }

    @Bean
    public TcpInboundGateway inGate() {
        TcpInboundGateway inGate = new TcpInboundGateway();
        inGate.setConnectionFactory(serverCF());
        return inGate;
    }

    @Bean
    public IntegrationFlow serverFlow() {
        return IntegrationFlows.from(inGate())
                .transform(Transformers.objectToString())
                .transform("headers['ip_connectionId'] + ' ' + payload")
                .get();
    }

    public static class ThreadAffinityClientConnectionFactory extends AbstractClientConnectionFactory
            implements TcpListener {

        private final AbstractClientConnectionFactory delegate;

        private final ThreadLocal<TcpConnectionSupport> connection = new ThreadLocal<>();

        public ThreadAffinityClientConnectionFactory(AbstractClientConnectionFactory delegate) {
            super("", 0);
            delegate.registerListener(this);
            this.delegate = delegate;
        }

        @Override
        protected TcpConnectionSupport obtainConnection() throws Exception {
            TcpConnectionSupport tcpConnection = this.connection.get();
            if (tcpConnection == null || !tcpConnection.isOpen()) {
                tcpConnection = this.delegate.getConnection();
                this.connection.set(tcpConnection);
            }
            return tcpConnection;
        }

        public void release() {
            TcpConnectionSupport connection = this.connection.get();
            if (connection != null) {
                connection.close();
                this.connection.remove();
            }
        }

        @Override
        public void start() {
            this.delegate.start();
            setActive(true);
            super.start();
        }

        @Override
        public void stop() {
            this.delegate.stop();
            setActive(false);
            super.stop();
        }

        @Override
        public boolean onMessage(Message<?> message) {
            return getListener().onMessage(message);
        }

    }

}

结果:

pool-2-thread-2 localhost:64559:1234:3d898822-ea91-421d-97f2-5f9620b9d369 foo
pool-2-thread-1 localhost:64560:1234:227f8a9f-1461-41bf-943c-68a56f708b0c foo
pool-2-thread-2 localhost:64559:1234:3d898822-ea91-421d-97f2-5f9620b9d369 foo
pool-2-thread-1 localhost:64560:1234:227f8a9f-1461-41bf-943c-68a56f708b0c foo

我猜包装类应该和CachingClientConnectionFactory类似?但是如何配置int-ip:tcp-connection-factory才能返回包装类呢? - Evyatar Kafkafi
请问您能否详细指导如何操作呢?我已经尝试了几种可能性,但都没有成功。看起来,在许多类中都存在使用一个共享连接的逻辑(在singleUse="false"的情况下)。非常感谢您的帮助。 - Evyatar Kafkafi
感谢您提供代码示例,非常有帮助。不过,您使用了singleUse=true,而我的用例有所不同:我有几个TCP客户端(在同一JVM中),都连接到同一个TCP服务器(外部系统),并且每个客户端上都有一个“对话”,其中包含20-150条双向消息。因此,我必须使用singleUse=false和2个Channel适配器。我扩展了您的代码示例,但是不得不将ConnectionFactoryFactoryBean复制粘贴到我的代码中,以创建自己的ConnectionFactory,因为它的某些方法无法被扩展。 - Evyatar Kafkafi
(如果您使用的是4.2或更高版本),请将上述评论应用。 - Gary Russell
我使用2个通道适配器(入站和出站),而不是像你的示例中一样使用一个网关适配器。我看到每个适配器都在单独的线程中运行。打开到TCP服务器的连接的线程将连接放入线程本地,但接收来自TCP服务器的响应的线程在线程本地没有此连接。您建议我像您的示例一样使用网关吗? - Evyatar Kafkafi
显示剩余9条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接