StompFrameHandler无法从消息中获取有效载荷

5

最终我成功连接到了WebSocket的端点,但无法提取消息负载。我可以获取头信息,但是无法识别负载。

我的WebSocket客户端如下所示:

    WebSocketTransport webSocketTransport = new WebSocketTransport(standardWebSocketClient);
    SockJsClient sockJsClient = new SockJsClient(Arrays.asList(webSocketTransport));
    WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);

    stompClient.setMessageConverter(new StringMessageConverter());

    StompSessionHandler sessionHandler = new MyStompSessionHandler();
    ListenableFuture<StompSession> connect = stompClient.connect(URL, sessionHandler);

        StompSession stompSession = connect.get();
        System.out.println("sessionId: " + stompSession.getSessionId());

        String path = "/queue/orders";

        stompSession.subscribe(path, new MySimpleStompFrameHandler());

Stomp帧处理程序:

    private class MySimpleStompFrameHandler implements StompFrameHandler {

    @Override
    public Type getPayloadType(StompHeaders stompHeaders) {
        System.out.println("Headers " + stompHeaders.toString());
        return String.class;
    }

    @Override
    public void handleFrame(StompHeaders stompHeaders, Object payload) {
        System.out.println("Msg " + payload.toString());
        completableFuture.complete(payload.toString());
    }
}

我在终端中使用了标头,但 handleFrame 方法没有任何输出。 有什么想法吗?
编辑: 调试后我发现问题出在 DefaultStompSession 类中,它使用了我的 frameHandler 实现。
    private void invokeHandler(StompFrameHandler handler, Message<byte[]> message, StompHeaders stompHeaders) {
    if (message.getPayload().length == 0) {
        handler.handleFrame(stompHeaders, null);
        return;
    }
    Type type = handler.getPayloadType(stompHeaders);
    Class<?> payloadType = ResolvableType.forType(type).resolve();
    Object object = getMessageConverter().fromMessage(message, payloadType);
    if (object == null) {
        throw new MessageConversionException("No suitable converter, payloadType=" + payloadType +
                ", handlerType=" + handler.getClass());
    }
    handler.handleFrame(stompHeaders, object);
}

问题出在这行代码之后:Type type = handler.getPayloadType(stompHeaders); 之后没有任何其他操作,我的程序就直接结束了,所以 handleFrame() 没有被执行。 这里有什么问题?可能是 getPayloadType 返回的 Type 类型有问题 - 我选择了 String 类型,因为我认为每个消息都可以表示为字符串。

仍在苦苦挣扎这个问题,有什么想法吗? - kris82pl
4个回答

8

我遇到了类似的问题。

问题在于我指定了stompClient.setMessageConverter(new StringMessageConverter());,而传递的却是 JSON 载荷。

解决方案(针对我的问题):stompClient.setMessageConverter(new MappingJackson2MessageConverter());

您可以采取以下措施以找到解决方案:

  • Throw (or log) when an exception is triggered. For your case, you should add an override for the following method in MyStompSessionHandler:

     @Override
     public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
         throw new RuntimeException("Failure in WebSocket handling", exception);
     }
    
  • While debugging invokeHandler, check the content of the payload. If you are able to execute the code, you can do this by executing new String((byte[]) message.getPayload()).


你救了我的一天,我使用了这行代码 new String((byte[]) message.getPayload()),并且我能够完成自己的实现,谢谢。 - Jairo Cordero

0
需要定义MessageConverter,然后实现getPayloadType方法,使用JSON对象类型。
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
stompClient.setMessageConverter(**new MappingJackson2MessageConverter()**);

....

public class SockJsWebsocketSubscriptionHandler implements StompFrameHandler {
    @Override
    public Type getPayloadType(StompHeaders headers) {
        return **HelloMessage.class;**
    }
...
}

0

我明白你的意思,你需要扩展StompSessionHandlerAdapter并实现StompFrameHandler

我为你尝试了一些:

public class MySimpleStompFrameHandler extends StompSessionHandlerAdapter implements StompFrameHandler {

private Logger logger = LogManager.getLogger(MyStompSessionHandler.class);

@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
    System.out.println("Connected");
}

@Override
public void handleException(StompSession session, StompCommand command, StompHeaders 
 headers, byte[] payload, Throwable exception) {
    logger.error("Got an exception", exception);
}

@Override
public Type getPayloadType(StompHeaders headers) {
    return <payload_type>.class;
}

@Override
public void handleFrame(StompHeaders headers, Object payload) {
    <payload_type> msg = (<payload_type>) payload;
    System.out.println(msg);
    logger.info("Received : " + msg);
}

}

payload_type 是您想要将流数据转换成的用户定义类时。

准备好了!


0

我分享我的实现,希望有用。 我没有设置消息转换器,而是自定义了handleFrame方法的实现。

    WebSocketClient client = new StandardWebSocketClient();
    WebSocketStompClient stompClient = new WebSocketStompClient(client);
    StompSessionHandler sessionHandler = new MyStompSessionHandler(prop);
    WebSocketHttpHeaders webSocketHttpHeaders = new WebSocketHttpHeaders();
    webSocketHttpHeaders.add("Authorization", "Bearer " + token);

    StompSession session = stompClient.connect(endpoint, webSocketHttpHeaders, sessionHandler).get();

然后处理发送的消息,在我的情况下是一个JSON字符串对象

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return null;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        String cadena = new String((byte[]) payload);
        JsonParser parser = new JsonParser();
        JsonObject obj = parser.parse(cadena).getAsJsonObject();
        System.out.println(obj.toString());
    }   

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接