在集群中发现Akka actors

43

我最近一直在努力理解Akka和基于actor的系统的概念。虽然我现在对Akka的基础知识有相当好的了解,但是当涉及到集群和远程actors时,我仍然遇到了一些困难。

我将尝试使用Play Framework 2.0附带的WebSocket聊天示例来说明问题:有一个持有WebSockets并保留当前连接用户列表的actor,这些actors基本上从技术和逻辑上代表了聊天室。只要单个服务器上运行一个聊天室,这个actor就能完美地运行。

现在我正在尝试理解当我们谈论许多动态聊天室(可以随时打开/关闭)在一组服务器上运行时(根据当前需求添加或删除单个节点),此示例应如何扩展。在这种情况下,用户A可以连接到服务器1,而用户B可以连接到服务器2。两者可能在同一个聊天室中交谈。在每个服务器上仍将存在一个actor(每个聊天室?)用于保留WebSocket实例以接收和发布事件(消息)给正确的用户。但是,在逻辑上,只应该有一个聊天室actor在服务器1或服务器2上保留当前连接用户的列表(或类似任务)。

那么,您会如何实现这一点,最好使用“纯akka”,而无需添加其他消息系统,例如ZeroMQ或RabbitMQ?

这是我目前想到的,请告诉我这是否有意义:

  1. 用户A连接到服务器1,并分配了一个持有他WebSocket的actor。
  2. 该actor检查(使用Router?EventBus?还是其他什么?)是否在任何已连接集群节点上存在一个“聊天室actor”以进行活动聊天室。如果不存在,则它将以某种方式请求创建新的聊天室actor,并将来自/发送到此actor的未来聊天消息。
  3. 用户B也连接并为其WebSocket分配了一个actor。
  4. 它还检查请求的聊天室是否存在演员,并在服务器1上找到它。
  5. 服务器1上的聊天室演员现在充当给定聊天室的中心枢纽,向所有“连接”的聊天成员演员发送消息并分发传入的消息。
  6. 如果服务器2崩溃,聊天室演员将不得不在某种方式下重新创建/移动到服务器2,尽管这不是我目前的主要关注点。我想知道的是如何使用Akka的工具集在各个基本独立的机器上进行演员的动态发现。

    我已经看了相当长时间的Akka文档,所以也许我错过了一些显而易见的东西。如果是这样,请启示我 :-)

3个回答

13
我正在开发一个私人项目,它基本上是聊天室示例的扩展版本,我在使用Akka和整个“分散式”思想时也遇到了一些启动问题。因此,我可以告诉你我如何“解决”我的扩展聊天室:
我想要一个服务器,可以轻松地部署多次,而不需要太多额外的配置。我正在使用redis作为所有开放用户会话(他们ActorRefs的简单序列化)和所有聊天室的存储。
服务器有以下actor:
- WebsocketSession:它保存与一个用户的连接,并处理来自用户的请求并将系统消息转发给用户。 - ChatroomManager:这是中央广播器,部署在服务器的每个实例上。如果用户想要向聊天室发送消息,则WebSocketSession-Actor将所有信息发送到ChatroomManager-Actor,然后广播该消息给聊天室的所有成员。
这是我的步骤:
- 用户A连接到服务器1,该服务器分配一个新的WebsocketSession。此actor将该actor的绝对路径插入redis。 - 用户A加入聊天室X,该聊天室也将其绝对路径(我将其用作用户会话的唯一ID)插入redis(每个聊天室都有一个“connections”集合)。 - 用户B连接到服务器2 -> redis - 用户B加入聊天室X -> redis - 用户B向聊天室X发送消息:用户B将其消息通过Websocket发送给他的会话actor,该actor(经过一些检查后)向ChatroomManager发送一个actor-message。这个Actor实际上从redis中检索聊天室的用户列表(使用akka的actorFor方法使用的绝对路径),然后将消息发送到每个会话actor。这些会话actors然后写入它们的Websockets。
在每个ChatroomManager-actor中,我执行了一些ActorRef缓存,这提高了额外的速度。我认为这与你的方法不同,特别是这些聊天室管理器处理所有聊天室的请求。但是,为一个聊天室设置一个演员会造成单点故障,我想要避免这种情况。此外,这将导致更多的消息,例如:

  • 用户A和用户B在服务器1上。
  • 聊天室X在服务器2上。

如果用户A想与用户B交谈,则两者都必须通过服务器1上的聊天室演员进行通信。

此外,我使用了akka的功能,如(轮循)路由器,在每个系统上创建多个ChatroomManager-actor的实例来处理许多请求。

我花了几天时间设置整个akka远程基础结构,结合序列化和redis。但现在我能够创建任意数量的服务器应用程序实例,它们使用redis共享它们的ActorRef(作为带有IP +端口的绝对路径进行序列化)。

这可能会进一步帮助您,我也可以回答新问题(请勿询问我的英语;)。


1
我必须同意Freed的答案,我可能只是将单点故障的问题转移到了存储方面。当然,分布式缓存是一种改进,但正如Freed所说,并不完美。 实际上,我通过一个已实现的“Storage”-Actor访问redis,该Actor目前处理数据请求(例如获取当前成员列表)。如果我理解正确,对于我的实现来说,这将是避免未来akka集群实现中瓶颈的地方。 - th3hamm0r
你不需要等待Akka集群扩展,可以查看我回答中链接的博客文章。Akka离完全支持集群还有很长的路要走。 - SoftMemes
聊天室成员列表可以在两个服务器上的两个actor中进行容错处理,并将更新发送到两个服务器。请参阅此论文,其中描述了如何使用paxos和聊天室ID的哈希模式来分配主/次要主机以执行此操作。然后,每个客户端websocket都会计算这两个服务器,以便知道要查询的主服务器,否则会回退到辅助服务器,如果该主机崩溃,则会回退到辅助服务器。https://www.dropbox.com/s/iihpq9bjcfver07/VLDB-Paper.pdf(现在称为aerospike的会话缓存服务器显然更好购买,而不是自己编写,但他们的算法和设计非常有启发性)。 - simbo1905
@th3hamm0r,我喜欢你的解决方案。我最近一直使用Akka集群单例解决方案,但仍然存在单点故障问题。 如果您愿意在git存储库或其他地方共享代码以获得灵感,我非常好奇如何用另一种方式解决这个问题。 - Bjarne77

10
跨多台机器扩展的关键是尽可能地隔离可变状态。虽然您可以使用分布式缓存来协调所有节点上的状态,但当扩展到大量节点时,这会导致同步和瓶颈问题。理想情况下,应该有一个单一的actor知道聊天室中的消息和参与者。
核心问题是,如果聊天室由在单台机器上运行的单个actor表示 - 或者确实存在这样的房间。技巧是使用标识符(例如聊天室名称)路由与给定聊天室相关的请求。计算名称的哈希值,并根据数字从n个框中选择一个。节点将了解其当前聊天室,并可以安全地为您查找或创建正确的聊天室actor。
您可以查看以下博客文章,讨论Akka中的群集和扩展问题:

http://blog.softmemes.com/2012/06/16/clustered-akka-building-akka-2-2-today-part-1/

http://blog.softmemes.com/2012/06/16/clustered-akka-building-akka-2-2-today-part-2/


7
我会使用Zookeeper+Norbert来了解哪些主机正在上下线:

http://www.ibm.com/developerworks/library/j-zookeeper/

现在,我的聊天室服务器群中的每个节点都可以知道逻辑集群中的所有主机。当节点离线(或上线)时,它们将得到回调。任何节点现在都可以保持当前群集成员的排序列表,对聊天室ID进行哈希,然后对列表大小进行取模以获取列表中应承载任何给定聊天室的节点的索引。我们可以添加1并重新哈希以选择第二个索引(需要循环直到获得新的索引)来计算用于冗余的第二个承载聊天室的主机。在这两个聊天室主机上,有一个聊天室演员,它只是将所有聊天消息转发给作为聊天室成员的每个Websocket演员。
现在,我们可以通过自定义Akka路由器通过两个活动聊天室演员发送聊天消息。客户端只需发送一次消息,路由器将执行哈希模数并发送到两个远程聊天室演员。我将使用Twitter Snowflake算法为发送的消息生成唯一的64位ID。请参见以下链接中code中nextId()方法中的算法。数据中心ID和workerId可以使用Norbert属性设置,以确保在不同服务器上生成不冲突的ID。

https://github.com/twitter/snowflake/blob/master/src/main/scala/com/twitter/service/snowflake/IdWorker.scala

现在,每个客户端终端都会通过两个活动聊天室角色接收每条消息的两个副本。在每个Websocket客户端角色中,我会取消位掩码以了解发送消息的数据中心ID + workerId号码,并跟踪群集中每个主机看到的最高聊天消息编号。然后,我将忽略任何不高于给定发送方主机的给定客户端已经看到的内容的消息。这将消除通过两个活动聊天室角色传入的一对消息的重复。
到目前为止,一切顺利;如果任何节点死亡,我们不会失去聊天室的一个幸存副本。消息将通过第二个聊天室自动无间断地流动。
接下来我们需要处理节点从集群中掉出或重新加入集群的情况。我们将在每个节点内获取Norbert回调,以通知我们有关集群成员更改的情况。在此回调上,我们可以通过自定义路由器发送一个Akka消息,说明新的成员列表和当前主机名。当前主机上的自定义路由将看到该消息并更新其状态,以了解新的集群成员身份以计算通过哪些节点发送任何给定的聊天室流量的新组合。这种新集群成员身份的确认将由路由器发送到所有节点,以便每个服务器都能跟踪所有服务器已经赶上了成员身份的变化,并且现在正在正确地发送消息。
在成员身份更改后,幸存的聊天室可能仍然处于活动状态。如果是这种情况,所有节点上的所有路由器将继续正常发送给它,但也会向新的第二个聊天室主机发送一条假设性的消息。第二个聊天室可能还没有启动,但这不是问题,因为消息将通过幸存者进行流动。如果存活的聊天室在成员身份更改后不再活动,则所有主机上的所有路由器首先将发送到三个主机;幸存者和两个新节点。可以使用Akka死亡监视机制,以便所有节点最终可以看到幸存的聊天室关闭,并返回通过两个主机路由聊天流量。
接下来,我们需要将聊天室从幸存服务器迁移到一个或两个新的主机上,具体情况取决于实际情况。幸存的聊天室 actor 最终会收到一条消息,告知它有关新群集成员的信息。它将首先向新节点发送聊天室成员的副本。此消息将在新节点上创建具有正确成员身份的新聊天室 actor 副本。如果幸存者不再是应该保留聊天室的两个节点之一,它将进入退役模式。在退役模式下,它仅会将任何消息转发给新的主要和次要节点,而不会转发给任何聊天室成员。Akka 消息转发非常适合这种情况。
退役聊天室将监听每个节点发布的 Norbert 群集成员确认消息。最终,它将看到群集中的所有节点都已确认了新的群集成员。然后它就知道自己将不再接收到任何要转发的消息。此时,它可以自我销毁。使用 Akka 热交换功能非常适合实现退役行为。
到目前为止,我们拥有了一个弹性的消息传递设置,它不会在节点崩溃时丢失消息。当集群成员身份更改时,我们将获得一个内部节点流量的峰值,以将聊天室复制到新节点。我们还有一些残留的内部节点消息转发,直到所有服务器都跟上了哪些聊天室移动到了哪些服务器。如果我们想要扩展系统,我们可以等待用户流量低谷,然后打开一个新节点。聊天室将自动重新分配到新节点。
上述描述是基于阅读以下论文并将其转化为Akka概念:

https://www.dropbox.com/s/iihpq9bjcfver07/VLDB-Paper.pdf


如果向集群添加新节点,则大多数actor将重新平衡到新节点(对吗?)您知道集群重新平衡需要多长时间吗? (如果仅为1个数据中心,例如10个节点。)(actors移动的原因是:“哈希聊天室ID,并通过[集群成员]列表大小进行模运算,以获取列表内应托管任何给定聊天室的节点的索引” - KajMagnus

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接