Spring Boot SSL TCPClient ~ StompBrokerRelayMessageHandler ~ ActiveMQ ~ Undertow Spring Boot中使用SSL的TCP客户端与Stomp Broker Relay消息处理程序,结合ActiveMQ和Undertow。

8
我正在尝试构建一个基于Spring Websocket Demo的websocket消息应用程序,使用ActiveMQ作为STOMP消息代理,并使用Undertow运行。该应用程序在不安全的连接上运行良好。然而,我在配置STOMP Broker Relay以转发SSL连接时遇到了困难。
如Spring WebSocket文档中所述...
引用: “上述配置中的“STOMP broker relay”是一个Spring MessageHandler,它通过将消息转发到外部消息代理来处理消息。为此,它建立与代理的TCP连接,将所有消息转发到代理,然后通过WebSocket会话将从代理接收到的所有消息转发给客户端。基本上,它充当一个“中继器”,双向转发消息。”
进一步地,文档中说明了对 reactor-net 的依赖,我已经...

请添加对 org.projectreactor:reactor-net 的依赖,以进行 TCP 连接管理。

问题在于我的当前实现没有通过 SSL 初始化 NettyTCPClient,因此 ActiveMQ 连接会出现 SSLException 错误。


[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED: 
[id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442]
...
[o.a.a.b.TransportConnection.Transport:245] » 
Transport Connection to: tcp://127.0.0.1:17779 failed:
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
...

因此,我尝试研究Project Reactor Docs以设置连接的SSL选项,但我没有成功。
目前,我发现StompBrokerRelayMessageHandler默认在Reactor2TcpClient中初始化NettyTCPClient,但似乎无法配置。
非常感谢您的帮助。 SSCCE

app.props

spring.activemq.in-memory=true
spring.activemq.pooled=false
spring.activemq.broker-url=stomp+ssl://localhost:8442
server.port=8443
server.ssl.enabled=true
server.ssl.protocol=tls
server.ssl.key-alias=undertow
server.ssl.key-store=classpath:undertow.jks
server.ssl.key-store-password=xxx
server.ssl.trust-store=classpath:undertow_certs.jks
server.ssl.trust-store-password=xxx

WebSocket配置

//... 
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class);

    private final static String KEYSTORE = "/activemq.jks";
    private final static String KEYSTORE_PASS = "xxx";
    private final static String KEYSTORE_TYPE = "JKS";
    private final static String TRUSTSTORE = "/activemq_certs.jks";
    private final static String TRUSTSTORE_PASS = "xxx";

    private static String getBindLocation() {
        return "stomp+ssl://localhost:8442?transport.needClientAuth=false";
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public SslBrokerService activeMQBroker() throws Exception {

        final SslBrokerService service = new SslBrokerService();
        service.setPersistent(false);

        KeyManager[] km = SecurityManager.getKeyManager();
        TrustManager[] tm = SecurityManager.getTrustManager();

        service.addSslConnector(getBindLocation(), km, tm, null);
        final ActiveMQTopic topic = new ActiveMQTopic("jms.topic.test");
        service.setDestinations(new ActiveMQDestination[]{topic});

        return service;
    }


    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(8442);
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/welcome").withSockJS();
        registry.addEndpoint("/test").withSockJS();
    }

   private static class SecurityManager { 
   //elided...
   }

}

已解决,根据Rossens的建议。以下是实现细节,供有兴趣的人参考。


WebSocketConfig

@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {
    ...
    @Bean
    public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
      StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
      ConfigurationReader reader = new StompClientDispatcherConfigReader();
      Environment environment = new Environment(reader).assignErrorJournal();
      TcpOperations<byte[]> client = new Reactor2TcpClient<>(new StompTcpClientSpecFactory(environment,"localhost", 8443));
      handler.setTcpClient(client);
      return handler;
    }
}

StompTCPClientSpecFactory
private static class StompTcpClientSpecFactory
        implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

    private static final Logger log = LoggerFactory.getLogger(StompTcpClientSpecFactory.class);

    private final String host;
    private final int port;
    private final String KEYSTORE = "src/main/resources/tcpclient.jks";
    private final String KEYSTORE_PASS = "xxx";
    private final String KEYSTORE_TYPE = "JKS";
    private final String TRUSTSTORE = "/src/main/resources/tcpclient_certs.jks";
    private final String TRUSTSTORE_PASS = "xxx";
    private final String TRUSTSTORE_TYPE = "JKS";
    private final Environment environment;

    private final SecurityManager tcpManager = new SecurityManager
            .SSLBuilder(KEYSTORE, KEYSTORE_PASS)
            .keyStoreType(KEYSTORE_TYPE)
            .trustStore(TRUSTSTORE, TRUSTSTORE_PASS)
            .trustStoreType(TRUSTSTORE_TYPE)
            .build();

    public StompTcpClientSpecFactory(Environment environment, String host, int port) {
        this.environment = environment;
        this.host = host;
        this.port = port;
    }

    @Override
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
            Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {

        return tcpClientSpec
                .ssl(new SslOptions()
                        .sslProtocol("TLS")
                        .keystoreFile(tcpManager.getKeyStore())
                        .keystorePasswd(tcpManager.getKeyStorePass())
                        .trustManagers(tcpManager::getTrustManager)
                        .trustManagerPasswd(tcpManager.getTrustStorePass()))
                .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
                .env(this.environment)
                .dispatcher(this.environment.getCachedDispatchers("StompClient").get())
                .connect(this.host, this.port);
    }
}

1
谁给这个点了踩,我真的想要一个解释... 没胆量。 - Edward J Beckett
内部允许这样做(https://github.com/reactor/reactor-io/blob/master/reactor-net/src/test/java/reactor/io/net/tcp/TcpServerTests.java#L100),但我正在检查它们是否已暴露。 - smaldini
4个回答

7
StompBrokerRelayMessageHandler有一个名为tcpClient的属性,您可以设置它。但是看起来我们没有通过WebSocketMessageBrokerConfigurer进行公开设置。

您可以删除@EnableWebSocketMessageBroker,并改为扩展DelegatingWebSocketMessageBrokerConfiguration。这实际上是相同的,但现在您直接从提供所有bean的配置类中进行扩展。

这样,您就可以覆盖stompBrokerRelayMessageHandler() bean,并直接设置其TcpClient属性。只需确保重载方法标记为@Bean


谢谢 Rossen...让我试试,先生。 - Edward J Beckett
只是想感谢你的帮助...花了我一些时间,但最终我终于让它工作了...你太棒了,伙计 :) - Edward J Beckett

4

@amoebob的回答很好,但线程没有正确关闭。每次客户端连接打开时,都会打开一个新线程并且从未关闭。我在生产环境中发现了这个问题,并花费了几天时间来解决它。因此,我建议您更改StompTcpFactory以改进线程重用:

import io.netty.channel.EventLoopGroup;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.stomp.Reactor2StompCodec;
import org.springframework.messaging.simp.stomp.StompDecoder;
import org.springframework.messaging.simp.stomp.StompEncoder;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import reactor.Environment;
import reactor.core.config.ReactorConfiguration;
import reactor.io.net.NetStreams;
import reactor.io.net.Spec;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.netty.NettyClientSocketOptions;

public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

  private final Environment environment;
  private final EventLoopGroup eventLoopGroup;
  private final String host;
  private final int port;
  private final boolean ssl;

  public StompTcpFactory(String host, int port, boolean ssl) {
    this.host = host;
    this.port = port;
    this.ssl = ssl;
    this.environment = new Environment(() -> new ReactorConfiguration(emptyList(), "sync", new Properties()));
    this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
  }

  @Override
  public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
    return tcpClientSpec
            .env(environment)
            .options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup))
            .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
            .ssl(ssl ? new SslOptions() : null)
            .connect(host, port);
  }

}

我正在获取 Reactor2TcpClient.initEventLoopGroup(); 的私有访问权限,使用的是 Reactor 2.0.8-RELEASE - jordan.baucke
你应该在spring-messaging中的org.springframework.messaging.tcp.reactor中找到Reactor2TcpClient。我正在使用版本4.3.13 - Martin Choraine
1
不错的交易,明白了。我仍在使用Spring Boot 1.4.2,因此必须手动更新一些Spring Messaging的依赖版本。长命百岁! - jordan.baucke
1
运作得很顺利!这个解决方案应该比@amoebob的答案获得更高的票数。 - leventunver

4

我需要使用Spring Messaging 4.2.5和Java 8来保护一个STOMP代理中继到RabbitMQ,并发现原问题的后续代码已经过时。

在启动我的应用程序时,我提供了信任存储环境属性以信任内部自签名证书颁发机构。 java -Djavax.net.ssl.trustStore=/etc/pki/java/server.jks -Djavax.net.ssl.trustStorePassword=xxxxx -jar build/libs/server.war

根据Rossen的答案,我进行了更改

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

为了

@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {

然后,在那个WebSocketConfig中,我提供了自己的AbstractBrokerMessageHandler bean:
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
    AbstractBrokerMessageHandler handler = super.stompBrokerRelayMessageHandler();
    if (handler instanceof StompBrokerRelayMessageHandler) {
        ((StompBrokerRelayMessageHandler) handler).setTcpClient(new Reactor2TcpClient<>(
                new StompTcpFactory("127.0.0.1", 61614, true)
        ));
    }
    return handler;
}

instanceof条件语句用于在单元测试中简化对NoOpBrokerMessageHandler的使用。

最后,以下是上述使用的StompTcpFactory的实现:

public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

    private final Environment environment = new Environment(new SynchronousDispatcherConfigReader());
    private final String host;
    private final int port;
    private final boolean ssl;

    public StompTcpFactory(String host, int port, boolean ssl) {
        this.host = host;
        this.port = port;
        this.ssl = ssl;
    }

    @Override
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
        return tcpClientSpec
                .env(environment)
                .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
                .ssl(ssl ? new SslOptions() : null)
                .connect(host, port);
    }

    private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
        @Override
        public ReactorConfiguration read() {
            return new ReactorConfiguration(Collections.emptyList(), "sync", new Properties());
        }
    }

}

太好了,修复成功!其实这应该不难。在返回tcpClientSpec对象之前,我不得不为VM设置一个默认的SSLContext,因为反应堆代码犯了一个经典错误,假设密钥库存在于磁盘上的物理文件中。 - Andy Brown
太完美了!我花了几个小时尝试使用SSL连接将代理中继与AmazonMQ(AWS)连接起来,但没有成功。您的配置像魔法一样奏效。在spring-messaging中是否有此开放问题? - Martin Choraine
@MartinChoraine 我也是!太棒了。顺便说一下,由于连接字符串提供的 stomp+ssl:// 前缀,它似乎不喜欢它。 - jordan.baucke
1
@jordan.baucke,我发布了一个新答案,因为这段代码几乎完美,但是线程处理存在问题。 - Martin Choraine
1
同意@MartinChoraine的观点,这个解决方案存在问题,并会在生产环境中引起严重麻烦。 - leventunver

1

对于所有寻求更新解决方案的人,我成功地以更加简洁的方式解决了这个问题。只需创建并使用带有SSL的自己的TCP客户端即可:

import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {

  private final WebsocketProperties properties;

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/ws").setAllowedOrigins("*");
    registry.addEndpoint("/ws").withSockJS();
  }

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {

    ReactorNettyTcpClient<byte[]> tcpClient = new ReactorNettyTcpClient<>(configurer -> configurer
            .host(properties.getRelayHost())
            .port(properties.getRelayPort())
            .secure(), new StompReactorNettyCodec());

    registry.enableStompBrokerRelay("/queue", "/topic")
            .setAutoStartup(true)
            .setSystemLogin(properties.getClientLogin())
            .setSystemPasscode(properties.getClientPasscode())
            .setClientLogin(properties.getClientLogin())
            .setClientPasscode(properties.getClientPasscode())
            .setTcpClient(tcpClient);

    registry.setApplicationDestinationPrefixes("/app");
    }
}

在我的情况下(稍有不同),我创建了两个ReactorNettyTcpClient的实现作为Beans,根据环境的不同选择带有/不带有SSL的一个。
依赖项:
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.4</version>
        <relativePath/>
    </parent>
    .
    .
    .
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-stomp</artifactId>
        <version>5.16.2</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor.netty</groupId>
        <artifactId>reactor-netty</artifactId>
        <version>1.0.8</version>
    </dependency>

我希望目前正在尝试解决这个问题的任何人都能从中受益。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接