我会使用Zookeeper+Norbert来了解哪些主机正在上下线:
http://www.ibm.com/developerworks/library/j-zookeeper/
现在,我的聊天室服务器群中的每个节点都可以知道逻辑集群中的所有主机。当节点离线(或上线)时,它们将得到回调。任何节点现在都可以保持当前群集成员的排序列表,对聊天室ID进行哈希,然后对列表大小进行取模以获取列表中应承载任何给定聊天室的节点的索引。我们可以添加1并重新哈希以选择第二个索引(需要循环直到获得新的索引)来计算用于冗余的第二个承载聊天室的主机。在这两个聊天室主机上,有一个聊天室演员,它只是将所有聊天消息转发给作为聊天室成员的每个Websocket演员。
现在,我们可以通过自定义Akka路由器通过两个活动聊天室演员发送聊天消息。客户端只需发送一次消息,路由器将执行哈希模数并发送到两个远程聊天室演员。我将使用Twitter Snowflake算法为发送的消息生成唯一的64位ID。请参见以下链接中code中nextId()方法中的算法。数据中心ID和workerId可以使用Norbert属性设置,以确保在不同服务器上生成不冲突的ID。
https://github.com/twitter/snowflake/blob/master/src/main/scala/com/twitter/service/snowflake/IdWorker.scala
现在,每个客户端终端都会通过两个活动聊天室角色接收每条消息的两个副本。在每个Websocket客户端角色中,我会取消位掩码以了解发送消息的数据中心ID + workerId号码,并跟踪群集中每个主机看到的最高聊天消息编号。然后,我将忽略任何不高于给定发送方主机的给定客户端已经看到的内容的消息。这将消除通过两个活动聊天室角色传入的一对消息的重复。
到目前为止,一切顺利;如果任何节点死亡,我们不会失去聊天室的一个幸存副本。消息将通过第二个聊天室自动无间断地流动。
接下来我们需要处理节点从集群中掉出或重新加入集群的情况。我们将在每个节点内获取Norbert回调,以通知我们有关集群成员更改的情况。在此回调上,我们可以通过自定义路由器发送一个Akka消息,说明新的成员列表和当前主机名。当前主机上的自定义路由将看到该消息并更新其状态,以了解新的集群成员身份以计算通过哪些节点发送任何给定的聊天室流量的新组合。这种新集群成员身份的确认将由路由器发送到所有节点,以便每个服务器都能跟踪所有服务器已经赶上了成员身份的变化,并且现在正在正确地发送消息。
在成员身份更改后,幸存的聊天室可能仍然处于活动状态。如果是这种情况,所有节点上的所有路由器将继续正常发送给它,但也会向新的第二个聊天室主机发送一条假设性的消息。第二个聊天室可能还没有启动,但这不是问题,因为消息将通过幸存者进行流动。如果存活的聊天室在成员身份更改后不再活动,则所有主机上的所有路由器首先将发送到三个主机;幸存者和两个新节点。可以使用Akka死亡监视机制,以便所有节点最终可以看到幸存的聊天室关闭,并返回通过两个主机路由聊天流量。
接下来,我们需要将聊天室从幸存服务器迁移到一个或两个新的主机上,具体情况取决于实际情况。幸存的聊天室 actor 最终会收到一条消息,告知它有关新群集成员的信息。它将首先向新节点发送聊天室成员的副本。此消息将在新节点上创建具有正确成员身份的新聊天室 actor 副本。如果幸存者不再是应该保留聊天室的两个节点之一,它将进入退役模式。在退役模式下,它仅会将任何消息转发给新的主要和次要节点,而不会转发给任何聊天室成员。Akka 消息转发非常适合这种情况。
退役聊天室将监听每个节点发布的 Norbert 群集成员确认消息。最终,它将看到群集中的所有节点都已确认了新的群集成员。然后它就知道自己将不再接收到任何要转发的消息。此时,它可以自我销毁。使用 Akka 热交换功能非常适合实现退役行为。
到目前为止,我们拥有了一个弹性的消息传递设置,它不会在节点崩溃时丢失消息。当集群成员身份更改时,我们将获得一个内部节点流量的峰值,以将聊天室复制到新节点。我们还有一些残留的内部节点消息转发,直到所有服务器都跟上了哪些聊天室移动到了哪些服务器。如果我们想要扩展系统,我们可以等待用户流量低谷,然后打开一个新节点。聊天室将自动重新分配到新节点。
上述描述是基于阅读以下论文并将其转化为Akka概念:
https://www.dropbox.com/s/iihpq9bjcfver07/VLDB-Paper.pdf