Spring WebSocket @SendToSession: 向特定会话发送消息

46

如何向特定会话发送消息?

我有一个未经身份验证的WebSocket连接客户端和Spring servlet。当异步作业结束时,我需要向特定连接发送一条未请求的消息。

@Controller
public class WebsocketTest {


     @Autowired
    public SimpMessageSendingOperations messagingTemplate;

    ExecutorService executor = Executors.newSingleThreadExecutor();

    @MessageMapping("/start")
    public void start(SimpMessageHeaderAccessor accessor) throws Exception {
        String applicantId=accessor.getSessionId();        
        executor.submit(() -> {
            //... slow job
            jobEnd(applicantId);
        });
    }

    public void jobEnd(String sessionId){
        messagingTemplate.convertAndSend("/queue/jobend"); //how to send only to that session?
    }
}

正如您在此代码中所看到的,客户端可以启动异步作业,并且当它完成时,需要结束消息。显然,我只需要向申请人发送消息,而不是广播给所有人。

如果有一个@SendToSession注释或messagingTemplate.convertAndSendToSession方法将会非常好。

更新

我尝试了这个:

messagingTemplate.convertAndSend("/queue/jobend", true, Collections.singletonMap(SimpMessageHeaderAccessor.SESSION_ID_HEADER, sessionId));
但这会向所有会话广播,而不仅仅是指定的那个。
更新2
使用convertAndSendToUser()方法进行测试。 此测试是官方Spring教程的黑客:https://spring.io/guides/gs/messaging-stomp-websocket/ 这是服务器代码:
@Controller
public class WebsocketTest {

    @PostConstruct
    public void init(){
        ScheduledExecutorService statusTimerExecutor=Executors.newSingleThreadScheduledExecutor();
        statusTimerExecutor.scheduleAtFixedRate(new Runnable() {                
            @Override
            public void run() {
                messagingTemplate.convertAndSendToUser("1","/queue/test", new Return("test"));
            }
        }, 5000,5000, TimeUnit.MILLISECONDS);
    } 

     @Autowired
        public SimpMessageSendingOperations messagingTemplate;
}

这是客户端代码:

function connect() {
            var socket = new WebSocket('ws://localhost:8080/hello');
            stompClient = Stomp.over(socket);
            stompClient.connect({}, function(frame) {
                setConnected(true);
                console.log('Connected: ' + frame);
                stompClient.subscribe('/user/queue/test', function(greeting){
                    console.log(JSON.parse(greeting.body));
                });
            });
        }

不幸的是,客户端没有按预期每5000毫秒接收到其每个会话的回复。我确定"1"是第二个连接的有效sessionId,因为我在调试模式下使用了SimpMessageHeaderAccessor.getSessionId()

背景情景

我想为远程作业创建一个进度条,客户端向服务器请求异步作业,并通过从服务器发送的websocket消息检查其进度。这不是文件上传,而是远程计算,因此只有服务器知道每个作业的进度。 我需要向特定会话发送消息,因为每个作业都是由会话启动的。 客户端请求远程计算 服务器启动此作业,并针对每个作业步骤回复申请人客户端其作业进度状态。 客户端获取有关其作业的消息,并构建进度/状态栏。 这就是我需要每个会话消息的原因。 我也可以使用每个用户消息,但Spring不提供每个用户未经请求的消息。(无法使用Spring Websocket发送用户消息

工作解决方案

 __      __ ___   ___  _  __ ___  _  _   ___      ___   ___   _    _   _  _____  ___  ___   _  _ 
 \ \    / // _ \ | _ \| |/ /|_ _|| \| | / __|    / __| / _ \ | |  | | | ||_   _||_ _|/ _ \ | \| |
  \ \/\/ /| (_) ||   /| ' <  | | | .` || (_ |    \__ \| (_) || |__| |_| |  | |   | || (_) || .` |
   \_/\_/  \___/ |_|_\|_|\_\|___||_|\_| \___|    |___/ \___/ |____|\___/   |_|  |___|\___/ |_|\_|

从我的UPDATE2解决方案开始,我必须使用最后一个参数(MessageHeaders)完成convertAndSendToUser方法的转换和发送:

messagingTemplate.convertAndSendToUser("1","/queue/test", new Return("test"), createHeaders("1"));

其中createHeaders()是这个方法:

private MessageHeaders createHeaders(String sessionId) {
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
        headerAccessor.setSessionId(sessionId);
        headerAccessor.setLeaveMutable(true);
        return headerAccessor.getMessageHeaders();
    }

我找到了这个:https://jira.spring.io/browse/SPR-12143 - Tobia
我给你写了一个例子。你怎么知道要发送信息给哪个具体的会话? - Aviad
这是一个远程工作管理系统,当控制器接收到远程工作请求时,应该将申请者的会话存储起来,以便将来路由工作状态回复。 - Tobia
Return("test")是什么?模型对象吗? - e-info128
嗨@Tobia,我尝试了你写的解决方案,但在我的情况下它不起作用。我有与你完全相同的情况,需要在后台运行异步任务并通过websocket发送消息以显示进度。对于已认证的用户,它可以正常工作,但对于未经身份验证的用户则无法正常工作。你能帮忙吗? - edeesan
5个回答

49

从Spring 4.1开始,无需创建特定的目的地,已经内置了(请参见SPR-11309)。

假设用户订阅了一个/user/queue/something队列,您可以使用以下方式向单个会话发送消息:

正如在SimpMessageSendingOperations Javadoc中所述,由于您的用户名实际上是sessionId,因此您必须将其作为标头设置,否则DefaultUserDestinationResolver无法路由消息,并将其删除。

SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor
    .create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(sessionId);
headerAccessor.setLeaveMutable(true);

messagingTemplate.convertAndSendToUser(sessionId,"/queue/something", payload, 
    headerAccessor.getMessageHeaders());

您不需要对用户进行身份验证。


明天我会试一下,这将非常棒。 - Tobia
2
终于搞定了...我应该传递一个头参数到convertAndSendToUser(),让它工作。请更新你的答案,加上最后一个参数,我会接受它作为答案! - Tobia
我不确定你是否需要这样做,或者这是否是Spring中的一个错误。我会调查一下。 - Brian Clozel
你是对的,在这种情况下标题是必需的,我刚刚更新了我的回复。不过我在想,这是否可以作为一种改进,或者我们有强制执行此操作的充分理由。如果您认为有改进的空间,请在此处打开问题:https://jira.spring.io - Brian Clozel
我遵循了你的指示,但是我的客户端仍然没有收到来自服务器发送的事件。 - Akshada
显示剩余7条评论

4

这很复杂,而且在我看来不值得。

你需要根据用户的会话ID为每个用户(即使未经身份验证的用户)创建一个订阅。

假设每个用户仅为他自己订阅了一个唯一的队列:

stompClient.subscribe('/session/specific' + uuid, handler);

在服务器端,在用户订阅之前,您需要通知并发送一条消息给特定的会话,并将其保存到映射中:

    @MessageMapping("/putAnonymousSession/{sessionId}")
    public void start(@DestinationVariable sessionId) throws Exception {
        anonymousUserSession.put(key, sessionId);
    }

在此之后,如果您想向用户发送消息,您需要:

messagingTemplate.convertAndSend("/session/specific" + key); 

但是我不太清楚你想做什么以及如何找到特定的会话(即匿名用户)。


1
这是我的实际解决方案,看起来像是一个变通方法。查看Spring Websocket源代码似乎可以获取每个活动套接字的会话ID,并且理论上似乎可以将消息路由到特定的连接。我将在问题中更新一个场景背景,告诉您我想通过Websockets实现什么。 - Tobia
问题在于你永远不知道要发送到哪个确切的会话。在这种情况下,您只需要将客户端订阅到特定的频道。 - Aviad
它能够这样做是因为任务从客户端消息开始,然后服务器应该启动存储申请者会话 ID 的此任务,当此任务前进时,服务器应向客户端发送带有每个任务存储的会话 ID 的消息。 - Tobia
我明白了。很遗憾,没有这样的功能。您需要在地图中保存作业,客户端应该订阅一个频道。抱歉我帮不上更多的忙。 - Aviad

4

只需在以下位置添加会话ID:

  • 服务器端

    convertAndSendToUser(sessionId,apiName,responseObject);

  • 客户端

    $stomp.subscribe('/user/'+sessionId+'/apiName',handler);

注意:
在服务器端,不要忘记将'/user'添加到终点。


3

我曾经遇到同样的问题,但是提供的解决方案对我没有用,因此我采取了不同的方法:

  1. 修改Web Socket配置,使用户可以通过会话ID进行识别:
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws-endpoint")
                .setHandshakeHandler(new DefaultHandshakeHandler() {
                    
                    @Override
                    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
                        if (request instanceof ServletServerHttpRequest) {
                            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
                            HttpSession session = servletRequest.getServletRequest().getSession();
                            return new Principal() {
                                @Override
                                public String getName() {
                                    return session.getId();
                                }
                            };
                        } else {
                            return null;
                        }
                    }
                }).withSockJS();
    }

2. 发送消息到该会话ID(不包含头部):
    simpMessagingTemplate.convertAndSendToUser(sessionId, "/queue", payload);

2
最简单的方法是利用@SendToUser中的广播参数。 文档如下:
“是否应将消息发送给与用户关联的所有会话,还是仅发送到处理输入消息的会话。默认情况下,这被设置为true,在这种情况下,消息将广播到所有会话。”
对于您的确切情况,它看起来像这样:
    @MessageMapping("/start")
    @SendToUser(value = "/queue/jobend", broadcast = false)
    //...

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接