Spring WebSockets ActiveMQ convertAndSendToUser

3
我有一个使用STOMP over WebSockets通信的Spring Boot应用程序(Jhipster),最近添加了一个ActiveMQ服务器来处理水平扩展,使用Amazon自动缩放组/负载均衡器。我使用convertAndSendToUser()方法,它适用于单个应用程序实例,以定位经过身份验证的用户的“个人队列”,以便只有他们收到消息。然而,当我在负载均衡器后面启动应用程序时,我发现只有在生成他们的websocket代理连接(到代理)所在的服务器上的事件时,消息才会被发送给用户。如何确保消息通过ActiveMQ传递到用户实际“连接”的应用程序实例,而不管哪个实例接收到执行convertAndSendToUser()事件的HTTP请求?以下是我的StompBrokerRelayMessageHandler供参考:
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
    StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
    handler.setTcpClient(new Reactor2TcpClient<>(
        new StompTcpFactory(orgProperties.getAws().getAmazonMq().getStompRelayHost(),
            orgProperties.getAws().getAmazonMq().getStompRelayPort(), orgProperties.getAws().getAmazonMq
            ().getSsl())
    ));

    return handler;
}

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableStompBrokerRelay("/queue", "/topic")
        .setSystemLogin(orgProperties.getAws().getAmazonMq().getStompRelayHostUser())
        .setSystemPasscode(orgProperties.getAws().getAmazonMq().getStompRelayHostPass())
        .setClientLogin(orgProperties.getAws().getAmazonMq().getStompRelayHostUser())
        .setClientPasscode(orgProperties.getAws().getAmazonMq().getStompRelayHostPass());

    config.setApplicationDestinationPrefixes("/app");
}

通过检查SessionSubscribeEvent中的标头,我已经找到了与ActiveMQ生成的队列对应的名称,这是在用户订阅用户队列时侦听器中生成的,称为simpSessionId
@Override
@EventListener({SessionSubscribeEvent.class})
public void onSessionSubscribeEvent(SessionSubscribeEvent event) {
    log.debug("Session Subscribe Event:" +
        "{}", event.getMessage().getHeaders().toString());
}

在ActiveMQ中可以找到相应的队列,格式为:{simpDestination}-user{simpSessionId}

我能否将sessionId保存在键值对中,然后只是将消息推送到该主题通道上?


我还发现在CONNECT / SUBSCRIBE帧中设置ActiveMQ特定的STOMP属性可以创建持久订阅者(可能性)。如果我设置了这些属性,Spring是否会理解路由? client-idsubcriptionName

我遇到了一个相关的问题,使用springboot连接AWS ActiveMQ实例时出现问题(嵌入式代理工作正常)。您在配置中使用orgProperties.getAws().getAmazonMq().*部分的库是什么?它是Java的AWS SDK之一吗? - Ryan Zakariudakis
1
@RyanZakariudakis,“orgProperties”的引用只是引用了来自Spring外部化配置的连接端点和凭据字符串。您可能会遇到一个问题,就是使用他们提供的“stomp+ssl://”连接字符串时,我在我的连接字符串中省略了整个前缀。 - jordan.baucke
1个回答

3
修改 MessageBrokerReigstry 配置 解决了该问题:
config.enableStompBrokerRelay("/queue", "/topic")
            .setUserDestinationBroadcast("/topic/registry.broadcast")

根据文档第4.4.13节中的这一段:
在多应用服务器场景下,用户目标可能无法解析,因为用户连接到不同的服务器。在这种情况下,您可以配置目标广播未解决的消息,以便其他服务器有机会尝试。这可以通过Java配置中MessageBrokerRegistry的userDestinationBroadcast属性和XML中message-broker元素的user-destination-broadcast属性来完成。
我没有看到任何关于为什么/topic/registry.broadcast是正确的“主题”目标的文档,但我发现了它的各种迭代:
1. websocket sessions sample doesn't cluster.. spring-session-1.2.2 2. What is MultiServerUserRegistry in spring websocket? 3. Spring websocket - sendToUser from a cluster does not work from backup server

谢谢,我以为RabbitMQ默认会这样做。 - M_K

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接