水平扩展WebSockets与水平扩展仅基于HTTP的无状态/有状态应用程序实际上非常不同。
水平扩展无状态HTTP应用程序:只需在不同的计算机上旋转一些应用程序实例并在它们前面放置负载均衡器即可。有许多不同的负载均衡器解决方案,例如HAProxy、Nginx等。如果您处于像AWS这样的云环境中,您还可以使用托管解决方案,例如弹性负载均衡器。
水平扩展有状态HTTP应用程序:如果我们总能使所有应用程序始终保持无状态那就太好了,但不幸的是,这并不总是可能的。因此,在处理有状态的HTTP应用程序时,您必须关心HTTP会话,这基本上是每个不同客户端的本地存储,其中Web服务器可以存储跨不同HTTP请求保留的数据(例如处理购物车)。在这种情况下,在水平扩展时,您应该意识到,正如我所说,这是一个本地存储,因此ServerA将无法处理在ServerB上的HTTP会话。换句话说,如果由于任何原因由ServerA为Client1提供服务突然开始由ServerB提供服务,则他的HTTP会话将丢失(购物车将消失!)。原因可能是节点故障甚至部署。 为了解决这个问题,您不能仅在本地保留HTTP会话,也就是说,必须将它们存储在另一个外部组件上。有几个组件可以处理此操作,例如任何关系型数据库,但那实际上是一种开销。一些NoSQL数据库可以很好地处理这种键值行为,例如Redis。 现在,借助存储在Redis中的HTTP会话,如果客户端开始由另一台服务器提供服务,则它将从Redis中提取客户端的HTTP会话并将其加载到内存中,因此一切都将继续工作,并且用户不会再丢失他的HTTP会话。 您可以使用Spring Session轻松地将HTTP会话存储在Redis中。
横向扩展 WebSocket 应用:当建立 WebSocket 连接时,服务器必须保持与客户端的连接打开,以便它们可以在双向交换数据。当客户端侦听目的地,例如 "/topic/public.messages" 时,我们称客户端已订阅此目的地。在 Spring 中,当您使用 simpleBroker
方法时,订阅是 保存在内存中的,那么例如如果客户端 Client1 由 ServerA 提供服务,并想使用 WebSocket 向由 ServerB 提供服务的 Client2 发送消息,那么您已经知道答案了!消息不会传递到 Client2,因为 Server1 甚至不知道 Client2 的订阅。
因此,为了解决这个问题,您需要再次将 WebSockets 订阅外部化。由于您使用 STOMP 作为子协议,因此需要一个外部组件,可以充当外部 STOMP 代理。有很多工具可以做到这一点,但我建议使用 RabbitMQ。
现在,您必须更改 Spring 配置,以便它不会在内存中保存订阅。相反,它将把订阅委托给外部 STOMP 代理。您可以通过一些基本配置轻松实现这一点,例如 enableStompBrokerRelay
。
需要注意的重要事项是HTTP 会话与 WebSocket 会话不同。使用 Spring Session 将 HTTP 会话存储在 Redis 中与横向扩展 WebSockets 没有任何关系。
我使用 Spring Boot 编写了一个完整的 Web 聊天应用程序(以及更多),使用 RabbitMQ 作为完整的外部 STOMP 代理,并且它在 GitHub 上公开,所以请克隆它,在您的机器上运行应用程序并查看代码细节。
当涉及到WebSocket连接丢失时,Spring并没有太多可做的。实际上,重新连接必须由客户端实现重新连接回调函数发起请求(这是WebSocket握手流程,客户端必须启动握手,而不是服务器)。有一些客户端库可以透明地为您处理这个问题,但这不是SockJS的情况。在聊天应用程序中,我也实现了这个重新连接特性。
您的需求可以分为2个子任务:
跨多个节点维护会话信息:您可以尝试使用基于Redis的Spring Sessions集群(参见:HttpSession with Redis)。这非常简单,已经支持Spring Websockets(参见:Spring Session & WebSockets)。
处理多个Tomcat实例的Websockets流量:有几种方法可以实现。
UserSessionRegistry
(例如:使用Redis :D )。默认实现DefaultUserSessionRegistry
使用内存存储。更新:我已经编写了一个使用Redis的简单实现,如果您感兴趣,请尝试
要配置全功能代理(代理中继),您可以尝试:
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
...
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost") // broker host
.setRelayPort(61613) // broker port
;
config.setApplicationDestinationPrefixes("/app");
}
@Bean
public UserSessionRegistry userSessionRegistry() {
return new RedisUserSessionRegistry(redisConnectionFactory);
}
...
}
和
import java.util.Set;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundHashOperations;
import org.springframework.data.redis.core.BoundSetOperations;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.messaging.simp.user.UserSessionRegistry;
import org.springframework.util.Assert;
/**
* An implementation of {@link UserSessionRegistry} backed by Redis.
* @author thanh
*/
public class RedisUserSessionRegistry implements UserSessionRegistry {
/**
* The prefix for each key of the Redis Set representing a user's sessions. The suffix is the unique user id.
*/
static final String BOUNDED_HASH_KEY_PREFIX = "spring:websockets:users:";
private final RedisOperations<String, String> sessionRedisOperations;
@SuppressWarnings("unchecked")
public RedisUserSessionRegistry(RedisConnectionFactory redisConnectionFactory) {
this(createDefaultTemplate(redisConnectionFactory));
}
public RedisUserSessionRegistry(RedisOperations<String, String> sessionRedisOperations) {
Assert.notNull(sessionRedisOperations, "sessionRedisOperations cannot be null");
this.sessionRedisOperations = sessionRedisOperations;
}
@Override
public Set<String> getSessionIds(String user) {
Set<String> entries = getSessionBoundHashOperations(user).members();
return (entries != null) ? entries : Collections.<String>emptySet();
}
@Override
public void registerSessionId(String user, String sessionId) {
getSessionBoundHashOperations(user).add(sessionId);
}
@Override
public void unregisterSessionId(String user, String sessionId) {
getSessionBoundHashOperations(user).remove(sessionId);
}
/**
* Gets the {@link BoundHashOperations} to operate on a username
*/
private BoundSetOperations<String, String> getSessionBoundHashOperations(String username) {
String key = getKey(username);
return this.sessionRedisOperations.boundSetOps(key);
}
/**
* Gets the Hash key for this user by prefixing it appropriately.
*/
static String getKey(String username) {
return BOUNDED_HASH_KEY_PREFIX + username;
}
@SuppressWarnings("rawtypes")
private static RedisTemplate createDefaultTemplate(RedisConnectionFactory connectionFactory) {
Assert.notNull(connectionFactory, "connectionFactory cannot be null");
StringRedisTemplate template = new StringRedisTemplate(connectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
}
跨多个节点维护会话信息:
假设我们有2个服务器主机,由负载均衡器备份。
Websockets是从浏览器到特定服务器主机(例如host1)的套接字连接。
现在,如果host1崩溃,来自负载均衡器-host1的套接字连接将中断。Spring如何重新打开从负载均衡器到host2的相同websocket连接?浏览器不应该打开新的websocket连接。