Spring Websocket多线程发送消息

7

我正在为我的一个基于Spring的项目使用Spring WebSocket服务器实现。遇到了一个错误,显示“远程端点处于无效状态[TEXT_PARTIAL_WRITING]”。我发现问题是由于同时从不同线程向WebSocket写入造成的。

我是如何暂时解决这个问题的:考虑我已经实现了以下方法:

void sendMessageToSession(WebsocketSession session,String message);

该方法向 WebSocket 会话发送 TextMessage。由于多个线程可以为不同的 WebSocket 会话和消息调用该方法,因此无法使整个方法同步执行。同时也无法将会话放入同步块中(尝试过但失败了)。

不过,我通过以下方式解决了问题。

synchronized(session.getId()){ 
    //sending message;
}

我不再面临这个问题。但是在同步块中使用字符串似乎不是一个好的做法。

那么我还有哪些其他解决方案?什么是发送异步消息的最佳方式?

PS:我已经在连接建立后使用了ConcurrentWebSocketSessionDecorator,并且我正在使用更新的Websocket。但没有帮助。

session = new ConcurrentWebSocketSessionDecorator(session, (int) StaticConfig.MAXIMUM_WS_ASYNC_SEND_TIMEOUT, StaticConfig.MAXIMUM_WS_BINARY_BUFFER_SIZE * 2);

注意 我将我的websocket会话保存在一个映射中,其中键是session.getId,值是session本身。

与其他一些websocket实现不同,Spring websocket的引用在每个消息上似乎并不相等。我通过它们的ID将会话保存在一个映射中,在每个消息上,我检查传递的websocket与我已经放置在映射中的websocket的相等性,结果为false。


据我所知,synchronized(session.getId()) 无法解决您的问题。 - xingbin
@user27149,现在我正在使用它,没有遇到任何异常,系统也运行正常,所以我可以说它确实解决了我的问题(暂时解决,因为我问这个问题是为了找到更好的解决方法)。 - Sepehr GH
是的,我明白您正在寻找更好的解决方案... - xingbin
当您尝试使用 synchronized(session) 时会发生什么? - Warren Dew
@WarrenDew 仍然是同样的错误。还请参考我的注释。 - Sepehr GH
3个回答

8
通过在我持久化会话的地方在我的WebsocketSession后面添加volatile关键字,我解决了这个问题。如果这也是一种不好的做法,我会很高兴知道。但我的想法是当从多个线程写入websocket会话时,这些线程失去了websocket状态,因为它还没有更新,这就是为什么会抛出此异常的原因。
通过添加volatile,我们确保websocket状态已经更新,然后另一个线程使用它,所以写入websocket按预期同步工作。
我创建了一个名为SessionData的类,它保存websocketSession和我需要的所有其他会话数据。
public class SessionData {
    private volatile WebSocketSession websocketSession;
    //...other 
    // getters and setters ...
}

我使用SessionData作为映射的值,其中会话ID是键

当从不同线程获取websocketSession并写入其中时,volatile有助于获取更新的websocketSession。


更新(2020年)

这里有一个关键要点,即每次想要向会话发送消息时,应该使用sessionData.getWebsocketSession.sendMessage(...)。永远不要直接使用会话,这意味着像this这样的代码是一种不良实践

WebSocketSession websocketSession = sessionData.getWebSocketSession();
websocketSession.sendMessage(...);

在这两行代码之间(在您的情况下可能超过2行),您永远不会知道WebSocket会话应用了哪些更改。

而像这样的代码会更好:

sessionData.getWebSocketSession().sendMessage(...);

同时,不要直接发布到在Spring WebSocket MessageHandler中传递给您的会话。否则您很可能会再次遇到该错误。

这就是为什么将WebSocketSessionsessionId映射到SessionData的好习惯。当连接打开时,您可以使用此存储库使用会话ID获取volatile session,而不是直接使用会话。


1
@NikolayShevchenko 我建议您将其放在sessionData类的会话后面。只是考虑概念,我应该说这意味着我们在从映射中获取它之后访问websocketSession的最后状态。但是,如果您将其放在map的后面,则正在访问map的最后状态,这不能保证映射内部的对象的任何内容。只需确保您正在使用ConcurrentHashMap - Sepehr GH
换句话说,这个是好的:sessionData.getWebSocketSession().sendMessage(...),而这个是不好的:WebSocketSession websocketSession = sessionData.getWebSocketSession(); websocketSession.sendMessage() - Sepehr GH
2
这并不是事实。这两种选择在底层没有区别。你需要在一个ConcurrentWebSocketSessionDecorator中包装会话。 - apanloco
这个答案需要更新,getWebSocketSession方法已经不存在了。 - FARS
@FARS 你似乎弄错了。那个方法是用于封装 WebSocket 连接的类。 - Sepehr GH
显示剩余3条评论

7

ConcurrentWebSocketSessionDecorator在多线程中表现出色,它专门为此而设计。

示例代码:


注:您的问题可能在地图实现中。
private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception 
{
    // Use the following will crash :
    //sessions.put(session.getId(), new SessionData(session));

    // Use ConcurrentWebSocketSessionDecorator is safe :
    sessions.put(session.getId(), new SessionData(new ConcurrentWebSocketSessionDecorator (session, 2000, 4096)));
    super.afterConnectionEstablished(session);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception
{
    sessions.remove(session.getId());
    super.afterConnectionClosed(session, status); 
}

public void send(WebSocketSession session, String msg) throws MessagingException {
    try {
        session.sendMessage (new TextMessage(msg));
    } catch (IOException ex) {
        throw new MessagingException(ex.getMessage());
    }
}

为了轻松测试多线程行为:
    public void sendMT(WebSocketSession session, String msg) throws MessagingException{
    for (int i=0; i<3; i++){
        new Thread(){
          @Override
          public void run(){
              send (session, msg);
        }.start();  
    }
}

这是正确的答案... 上面的“update 2020”被接受的答案不是线程安全的。 - apanloco
1
是的,被接受的答案有点奇怪。 - Kimi Chiu
这个例子更新了sessions但从未读取它。 - Pavel Vlasov
正如问题中所看到的,我已经封装了WebSocket会话。但是再次阅读我的答案,我可以说这可能很容易就是地图的问题。接受这个作为答案 :) - Sepehr GH
需要MRE,不知道afterConnectionClosed方法是从哪里继承的。 - FARS

1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接