如何使用Spring WebSocket向自定义用户发送自定义消息?

13

我对Spring WebSocket不太熟悉。我想向客户端发送产品更改。为此,我希望按以下方式操作:客户端创建套接字连接并订阅目标:

var socket = new SockJS('/websocket');
var stompClient = Stomp.over(socket);

stompClient.connect({}, function (frame) {
    stompClient.subscribe('/product/changes', function (scoredata) {
        // We received product changes
    });
});
//Send Ajax request and say server I want to know product with id=5 changes.
sendAjaxRequest(5);

我已将Spring应用程序配置如下:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/product/");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

现在我需要以下方法:

@RestController
public class ProductController {

    @GetMapping("product-{id}")
    public void startSubscribe(@PathVariable("id") Long id) {

        // register current websocket session with product id and 
        // then with convertAndSendToUser send changes to current user.
    }

}

我该如何实现它?

3个回答

17

我的第一个问题是,为什么在已经成功集成了使用Stomp的WebSockets后,你还要尝试向REST控制器发送HTTP请求呢?如果我正确理解你的用例,那么目前我可以想到三种解决方案。

解决方案1 (socket session ↔ product id)

您可以通过打开的Websocket连接直接从客户端向服务器发送请求。Spring会确定哪个Websocket会话发出了该调用,并且您可以实现您的业务逻辑。您需要激活另一个代理程序名为"/queue",并指定当订阅不是广播时所需的用户目标前缀。在客户端上,您还必须更改您的订阅路径。最后,您必须创建一个带有@Controller注释的类,其中包含来自连接的客户端的消息映射以接收消息。

服务器配置

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").withSockJS();
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/queue", "/product");  // <- added "/queue"
        registry.setApplicationDestinationPrefixes("/app");
        registry.setUserDestinationPrefix("/user");
    }
}

服务器控制器

@Controller
public class WebSocketContoller{
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/product/register")
    public void register(@Payload Long productId, @Header("simpSessionId") String sessionId) {
        // register current websocket session with product id and 
        // then with convertAndSendToUser send changes to current user.

        // Example of how to send a message to the user using the sessionId
        String response = "This could also be one of your product objects of type Product";
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
        headerAccessor.setSessionId(sessionId);
        headerAccessor.setLeaveMutable(true);

        messagingTemplate.convertAndSendToUser(sessionId,"/queue/product/changes", response, headerAccessor.getMessageHeaders());
    }
}

客户订阅更改

stompClient.subscribe('/user/queue/product/changes', function (scoredata) {
    // We received product changes
});

如需详细信息,请查看此答案:https://stackoverflow.com/a/26288475/11133168


解决方案2(主要是产品ID)

但是,如果您确实想考虑使用rest控制器来开始注册进程,或者它不能满足您的要求,那么您应该查看下面的链接。Spring还能够通过公开的SimpUserRegistry bean跟踪活动的websocket会话及其用户。但是,根据您应用程序的安全性,您将需要为客户端输入通道配置自定义ChannelInterceptor适配器以确定用户。 有关详细信息和代码示例,请参见此答案:https://dev59.com/fKPia4cB1Zd3GeqP4dMw#45359294


解决方案3(产品ID主题)

您也可以订阅特定的产品ID主题,这样您甚至不需要知道哪个用户想要被通知更改特定产品。

客户端订阅更改

//e.g if you want to be notified about changes for products with id 5 
stompClient.subscribe('/product/changes/5', function (scoredata) {
    // We received product changes
});

服务器服务示例

@Service
public class WebSocketProductService{

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    // This would be the method which should inform your clients about specific product     
    // changes, instead of the String parameters a Product object should be used instead, 
    // you have to call this method yourself on product changes or schedule it or sth.
    public void sendProductChange(String product, String productId) {
        this.simpMessagingTemplate.convertAndSend("/product/changes/"+productId, product);
    }
}

服务器控制器

如果您想管理产品ID订阅列表,则需要该控制器。如解决方案1中所述,您需要一个带有@ Controller注释的类,其中包含一个带有@ SubscribeMapping注释的方法。如果客户端尝试订阅指定路径,则会调用此方法。

@Controller
public class WebSocketContoller{
    @SubscribeMapping("/product/changes/{productId}")
    public void productIdSubscription(@DestinationVariable Long productId) {
        //Manage your product id subscription list e.g.
    }
}

用户可以在一个会话中听取不同的产品。 - Morteza Malvandi
1
为什么我们需要在第一个参数和头部中都指定sessionid两次?即:messagingTemplate.convertAndSendToUser(sessionId,"/queue/product/changes", response, headerAccessor.getMessageHeaders()) - GabrielBB

3
如果您想仅在用户请求时向用户发送产品更新,则可以使用普通的HTTP请求。但我理解您想基于特定于用户的业务逻辑推送通知。您还必须实现Spring Security来验证用户。
解决方案
我建议在后端中使用user_product_updates(user_id,product_id)表添加此业务逻辑-每行对应于用户ID想要订阅更新的product_id。
@GetMapping("product-{id}")
public void startSubscribe(@PathVariable("id") Long id) {
    // Save this custom setting into your models
}

现在您可以运行一个预定的后端作业(可以是基于您推送通知的业务逻辑的cron作业),向您的用户发送更新:
@Autowired 
org.springframework.messaging.simp.SimpMessagingTemplate simpMessagingTemplate;   

@Scheduled(cron = "0 0 1 * * ?") // based on your business logic (say daily at 12:01 am)
public void scheduleTaskUsingCronExpression() {
   // loop through user_product_updates table and construct "data"
   // username is from your spring security username (principal.getName())
   simpMessagingTemplate.convertAndSendToUser(username, "/queue/products", data);
}

将来,您可能希望添加一些缓存以进行优化(特别是从 product_id 获取产品信息),以使一切顺利运行。


摘要

您的Web Socket配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app")
            .setUserDestinationPrefix("/user")
            .enableSimpleBroker("/topic", "/queue", "/product");
    }
}

您在前端应用程序中的监听器可以如下所示:

that.stompClient.subscribe("/user/queue/products", (message) => {
    if (message.body) {
      // We received product changes
    }
});

用户将注册产品更新:

@GetMapping("product-{id}")
public void startSubscribe(@PathVariable("id") Long id) {
    // Save to your persistence module
    // (that the particular user wants updates from such-and-such products)
}

后端调度程序将在更新可用时发送更新:

@Scheduled(cron = "0 0 1 * * ?") // based on your business logic
public void scheduleTaskUsingCronExpression() {
   // loop through user_product_updates table and construct "data"
   // username is from your spring security username (principal.getName())
   template.convertAndSendToUser(username, "/queue/products", data);
}

1
感谢您的回答,并为您的回答添加一些注释: 1- 这种方法具有处理负荷, 2- 它有延迟, 3- 需要实现安全性 4- 再次感谢您的回答 :)。 - Morteza Malvandi
如果你的流量或用户群体非常庞大,那么就会有负载。convertAndSendToUser(username, "/queue/products", data)不需要Spring Security,它只是从中获取数据。你可以很好地从stompclient中获取websocket用户名并发送特定于用户的更新。 :) - kukkuz

0

Spring 文档 是学习 Web Socket 概念的一个不错的起点。 要发送到客户端,可以使用SimpMessageSendingOperations

@Autowired
private SimpMessageSendingOperations messageSendingOperations;

从控制器方法中,可以通过以下方式发送消息:

messageSendingOperations.convertAndSendToUser(websocketUserId, "/product/changes", messageObject);

这可以是当前身份验证的主要原则(由Spring安全支持)。您可能还需要处理websocket的安全配置。https://docs.spring.io/spring-security/site/docs/4.0.x/reference/html/websocket.html - Shah Minul Amin
当套接字连接建立时,它会生成一个ID,我该如何访问它? - Morteza Malvandi
获取消息头信息的一种方法是使用SimpMessageHeaderAccessor(https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/messaging/simp/SimpMessageHeaderAccessor.html)。它有一个用于获取当前会话ID的方法。您可以在控制器方法中注入SimpMessageHeaderAccessor,例如:public void startSubscribe(SimpMessageHeaderAccessor headerAccessor,@PathVariable(“id”)Long id) - Shah Minul Amin
getSessionId() 和 getSubscriptionId() 方法返回 null。 - Morteza Malvandi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接