我正在尝试使用Springframework SimpMessagingTemplate(默认的Stomp实现)流式传输时间序列数据,向SockJS客户端订阅的主题广播消息。但是,消息被无序地接收到。服务器是单线程的,按照它们的时间戳以升序发送消息。但是客户端以某种方式接收到了无序的消息。
我正在使用stompjs和springframework的最新版本(4.1.6 release)。
我正在尝试使用Springframework SimpMessagingTemplate(默认的Stomp实现)流式传输时间序列数据,向SockJS客户端订阅的主题广播消息。但是,消息被无序地接收到。服务器是单线程的,按照它们的时间戳以升序发送消息。但是客户端以某种方式接收到了无序的消息。
我正在使用stompjs和springframework的最新版本(4.1.6 release)。
看起来有一个内置的带条纹的执行器,所以只需启用它:
@Override
protected void configureMessageBroker(MessageBrokerRegistry registry) {
// ...
registry.setPreservePublishOrder(true);
}
找到了这个问题的根源。从应用程序实现的角度来看,消息是按照“正确”的顺序发送的(即,在一个线程或至少线程安全的方式中调用convertAndSend())。然而,Springframework web socket使用了reactor-tcp实现,它将从线程池中的clientOutboundChannel处理消息。因此,消息可以以不同于它们到达的顺序写入tcp套接字。当我为clientOutboundChannel限制了1个线程时,顺序得到了保留。
这个问题不在SocketJS中,而是当前Spring web socket设计的限制。
这是一个Spring Web Socket的设计问题。为了按照正确的顺序接收消息,您需要将Websocket客户端的corePoolSize设置为1。
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketMessageBrokerConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(1);
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(1);
}
}
更新
请参考@Jason的回答。Spring 5.1有一个setPreservePublishOrder()函数,可以根据客户端ID对消息进行排序。
我也遇到过这个问题。我不想将线程池大小限制为1,因为这会给我的应用程序带来额外的开销。相反,我使用了StripedExecutorService来处理进出我的应用程序的消息。这种类型的执行器服务保证了具有相同条纹的任务的有序处理。对于我来说,我使用WebSocket会话ID作为条纹。通过在入站、代理和出站通道上注册此执行器,即可保证有序消息。明智地选择您的条纹。