Spring Boot处理TCP/IP服务器

7

需要通过以太网连接实现处理以下协议的服务器:

Establishing a connection
The client connects to the configured server via TCP / IP.
After the connection has been established, the client initially sends a heartbeat message to the
Server:

{
  "MessageID": "Heartbeat"
}

Response:
{
  "ResponseCode": "Ok"
}

Communication process
To maintain the connection, the client sends every 10 seconds when inactive
Heartbeat message.
Server and client must close the connection if they are not receiving a message for longer than 20 seconds.
An answer must be given within 5 seconds to request. 
If no response is received, the connection must also be closed.
The protocol does not contain numbering or any other form of identification.
Communication partner when sending the responses makes sure that they are in the same sequence.

Message structure:
The messages are embedded in an STX-ETX frame.
STX (0x02) message ETX (0x03)
An `escaping` of STX and ETX within the message is not necessary since it is in JSON format

Escape sequence are following:

JSON.stringify({"a": "\x02\x03\x10"}) → "{"a": "\u0002\u0003\u0010"}"

不仅应该使用心跳消息。一个典型的消息应该是这样的:

{
  "MessageID": "CheckAccess"
  "Parameters": {
    "MediaType": "type",
    "MediaData": "data"
  }
} 

相应的回应:

{
  "ResponseCode":   "some-code",
  "DisplayMessage": "some-message",
  "SessionID":      "some-id"
}

服务器应该是多客户端的,协议本身没有任何标识。
然而,我们至少需要识别客户端发送请求的IP地址。

在Spring Boot应用程序中添加这样的服务器并在启动时启用以及处理其输入和输出逻辑,目前还没有找到解决方案。
欢迎提出任何建议。


解决方案

为TCP服务器配置如下:

@Slf4j
@Component
@RequiredArgsConstructor
public class TCPServer {
    private final InetSocketAddress hostAddress;
    private final ServerBootstrap serverBootstrap;

    private Channel serverChannel;

    @PostConstruct
    public void start() {
        try {
            ChannelFuture serverChannelFuture = serverBootstrap.bind(hostAddress).sync();
            log.info("Server is STARTED : port {}", hostAddress.getPort());

            serverChannel = serverChannelFuture.channel().closeFuture().sync().channel();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @PreDestroy
    public void stop() {
        if (serverChannel != null) {
            serverChannel.close();
            serverChannel.parent().close();
        }
    }
}

@PostConstruct会在应用程序启动时启动服务器。

还有它的配置:

@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(NettyProperties.class)
public class NettyConfiguration {

    private final LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
    private final NettyProperties nettyProperties;

    @Bean(name = "serverBootstrap")
    public ServerBootstrap bootstrap(SimpleChannelInitializer initializer) {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup(), workerGroup())
                .channel(NioServerSocketChannel.class)
                .handler(loggingHandler)
                .childHandler(initializer);
        bootstrap.option(ChannelOption.SO_BACKLOG, nettyProperties.getBacklog());
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, nettyProperties.isKeepAlive());
        return bootstrap;
    }

    @Bean(destroyMethod = "shutdownGracefully")
    public NioEventLoopGroup bossGroup() {
        return new NioEventLoopGroup(nettyProperties.getBossCount());
    }

    @Bean(destroyMethod = "shutdownGracefully")
    public NioEventLoopGroup workerGroup() {
        return new NioEventLoopGroup(nettyProperties.getWorkerCount());
    }

    @Bean
    @SneakyThrows
    public InetSocketAddress tcpSocketAddress() {
        return new InetSocketAddress(nettyProperties.getTcpPort());
    }
}

初始化逻辑:

@Component
@RequiredArgsConstructor
public class SimpleChannelInitializer extends ChannelInitializer<SocketChannel> {

    private final StringEncoder stringEncoder = new StringEncoder();
    private final StringDecoder stringDecoder = new StringDecoder();

    private final QrReaderProcessingHandler readerServerHandler;
    private final NettyProperties nettyProperties;

    @Override
    protected void initChannel(SocketChannel socketChannel) {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Delimiters.lineDelimiter()));
        pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
        pipeline.addLast(stringDecoder);
        pipeline.addLast(stringEncoder);
        pipeline.addLast(readerServerHandler);
    }
}

属性配置:

@Getter
@Setter
@ConfigurationProperties(prefix = "netty")
public class NettyProperties {
    @NotNull
    @Size(min = 1000, max = 65535)
    private int tcpPort;

    @Min(1)
    @NotNull
    private int bossCount;

    @Min(2)
    @NotNull
    private int workerCount;

    @NotNull
    private boolean keepAlive;

    @NotNull
    private int backlog;

    @NotNull
    private int clientTimeout;
}

以下是 application.yml 文件的一部分示例:

netty:
  tcp-port: 9090
  boss-count: 1
  worker-count: 14
  keep-alive: true
  backlog: 128
  client-timeout: 20

处理程序非常简单。

在控制台上运行本地检查:

telnet localhost 9090

在这里可以正常工作。我希望对于客户端的访问也可以正常工作。

1个回答

6
由于该协议不是基于HTTP(与WebSocket不同,WebSocket首先依赖于HTTP),您唯一的选择是自己使用TCP服务器,并将其连接到Spring上下文中,以充分利用Spring。
Netty以低级TCP/IP通信而闻名,并且很容易在Spring应用程序中包装Netty服务器。
事实上,Spring Boot提供了一个Netty HTTP服务器,但这不是你需要的TCP communication server with Netty And SpringBoot项目是您所需要的简单而有效的示例。
从该项目中查看TCPServer,它使用Netty的ServerBootstrap来启动自定义TCP服务器。
一旦您拥有服务器,您可以将Netty编解码器Jackson或其他消息转换器与应用程序域数据的编组/解组相适应。
[更新-2020年7月17日] 针对问题的更新理解(HTTP和TCP请求均在同一端点终止),以下是更新的解决方案建议。
                ----> HTTP服务器(be_http)
               |
----> HAProxy -
               |
                ----> TCP服务器(be_tcp)
                
为使此解决方案生效,需要进行以下更改/添加:
  1. 在您现有的Spring Boot应用程序中添加基于Netty的监听器,或为TCP服务器创建单独的Spring Boot应用程序。假设此端点正在端口9090上侦听TCP流量。
  2. 添加HAProxy作为入口流量的终止端点。
  3. 配置HAProxy,以便将所有HTTP流量发送到现有的Spring Boot HTTP端点(称为be_http)的8080端口。
  4. 配置HAProxy,以便将所有非HTTP流量发送到新的TCP Spring Boot端点(称为be_tcp)的9090端口。
以下是足够的HAProxy配置。这些摘录与此问题相关,请根据正常HAProxy设置添加其他HAProxy指令:

           
listen 443
  mode tcp
  bind :443 name tcpsvr
  /* 添加其他常规指令 */
  tcp-request inspect-delay 1s
  tcp-request content accept if HTTP
  tcp-request content accept if !HTTP
  use-server be_http if HTTP
  use-server be_tcp if !HTTP
  /* 后端服务器定义 */
  server be_http 127.0.0.1:8080
  server be_tcp 127.0.0.1:9090 send-proxy

以下是特别有用的HAProxy文档链接。

  1. 从缓冲区内容中提取样本 - 第6层
  2. 预定义的ACL
  3. tcp-request inspect-delay
  4. tcp-request content

个人建议尝试和验证tcp-request inspect-delay,并根据实际需求进行调整,因为这可能会在最坏情况下添加请求延迟,其中已建立连接但尚无内容可用以评估请求是否为HTTP。

为了满足需要,至少需要从发送方获取IP地址来标识客户端,您可以选择在将其发送回后端时使用代理协议。我已经更新了上面的示例配置,以包括在be_tcp中的代理协议(添加了send_proxy)。 我还从be_http中删除了send_proxy,因为它对于spring boot不是必需的,相反,您可能会依赖于be_http后端的常规X-Forwarded-For头部。
在be_tcp后端中,您可以使用Netty的HAProxyMessage来使用sourceAddress() API获取实际源IP地址。 总的来说,这是一个可行的解决方案。 我自己使用了带有代理协议的HAProxy(在前端和后端都是),对于工作来说更加稳定。

如果Web套接字有一个Spring选项,那么配置将会是什么样子?我们发送不同的请求体到同一个端点。 - catch23
1
@nazar_art,就你的情况而言(据我所了解),该协议并非以http为基础。WebSocket使用HTTP作为其基础,握手的一部分是使用HTTP进行初始连接,然后将其升级为WebSocket。Spring已经支持长时间的Web套接字(一个例子),但这不是你需要的,因为你提到的协议概述不符合http。它是在原始套接字上交换PDU(协议数据单元)。 - Avnish
1
给定的示例(STOMP over websocket)使用以下层中的协议 - STOMP消息封装在WebSocket帧中,这些帧又依次搭载在HTTP上。有许多示例可以向您展示如何使用纯WebSocket。这是另一个示例,它不使用STOMP而是使用纯原始WebSocket。底线是,根据您在问题中分享的协议概述,WebSocket将无法在提供的场景中工作。如果我在问题或协议概述中遗漏了任何内容,请告诉我。 - Avnish
你理解得非常好。我只想知道当你有一个端点但发送不同的请求消息时,如何解决这种情况。 - catch23
1
这正是HAProxy的介绍目的。它能够在单个端点上接收两种协议(HTTP和原始TCP),检查有效载荷并相应地将其转发到两个后端之一(be_http或be_tcp)。 - Avnish
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接