Akka事件总线教程

17

有没有关于如何在Akka中使用事件总线的好教程或解释? 我已经阅读了Akka文档,但我发现很难理解如何使用事件总线。


http://www.kotancode.com/2014/02/12/using-the-akka-event-bus/ - AnonGeek
1
这种问题在哪里可以找到答案? - Ajay George
kotancode.com的链接已经失效,但是可以在这里找到最后一个版本:https://web.archive.org/web/20170505210732/http://www.kotancode.com/2014/02/12/using-the-akka-event-bus/ - gervais.b
2个回答

44

不确定是否有好的教程,但我可以给你一个快速示例,说明使用事件流可能有帮助的可能用户案例。总体而言,事件流是满足应用程序可能具有的发布/订阅类型要求的有效机制。假设您有一个用例,在该用例中更新系统中用户的余额。由于经常访问余额,因此您决定将其缓存以获得更好的性能。当更新余额时,您还想检查并查看用户是否超过了他们的余额阈值,如果是,则通过电子邮件通知他们。您不希望缓存丢失或余额阈值检查直接绑定到主要余额更新调用,因为它们可能会变得繁重并减慢用户的响应速度。您可以像这样对该特定需求进行建模:

//Message and event classes
case class UpdateAccountBalance(userId:Long, amount:Long)
case class BalanceUpdated(userId:Long)

//Actor that performs account updates
class AccountManager extends Actor{
  val dao = new AccountManagerDao

  def receive = {
    case UpdateAccountBalance(userId, amount) =>
      val res = for(result <- dao.updateBalance(userId, amount)) yield{
        context.system.eventStream.publish(BalanceUpdated(userId))
        result                
      }

      sender ! res
  }
}

//Actor that manages a cache of account balance data
class AccountCacher extends Actor{
  val cache = new AccountCache

  override def preStart = {
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
  }

  def receive = {
    case BalanceUpdated(userId) =>
      cache.remove(userId)
  }
}

//Actor that checks balance after an update to warn of low balance
class LowBalanceChecker extends Actor{
  val dao = new LowBalanceDao

  override def preStart = {
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
  }

  def receive = {
    case BalanceUpdated(userId) =>
      for{
        balance <- dao.getBalance(userId)
        theshold <- dao.getBalanceThreshold(userId)
        if (balance < threshold)
      }{
        sendBalanceEmail(userId, balance)
      }
  }
}
在这个例子中,AccountCacherLowBalanceChecker 这两个角色都按类类型订阅了 eventStreamBalanceUpdated 事件。如果该事件被发布到流中,它将被这两个角色实例接收。然后,在 AccountManager 中,当余额更新成功时,它会为用户引发一个 BalanceUpdated 事件。当这种情况发生时,在并行处理的同时,该消息会被传递到 AccountCacherLowBalanceChecker 的邮箱中,导致缓存中的余额被清除,并检查账户阈值,可能会发送一封电子邮件。
现在,你可以直接向这另外两个角色放置tell (!)调用,但是有人可能会认为这样做会过于紧密地耦合了余额更新的这两个“副作用”,而且这些细节不一定属于 AccountManager。 如果您有一个条件可能会导致一些其他事情发生(检查、更新等),这些事情纯粹是作为副作用发生的(不是核心业务流本身的一部分),那么事件流可能是一种很好的方法,用于解耦引发事件和可能需要对该事件做出反应的人。

1
ActorSystem 已经为其创建了事件总线,无需自己创建。由于 ActorSystem 创建总线,我认为根 guardian 负责总线。我不确定您在第三个问题中的意思是什么,您能再解释一下吗? - cmbaxter
1
我再次阅读了文档,似乎我误解了自己的问题。我的意思是上面例子中的演员被订阅以接收某个消息(BalanceUpdated)。那么,我该如何订阅一个主题,可以发送多种消息给演员呢? - Tsume
1
这个答案非常有用。它与我在阅读Akka文档后开始写的内容非常不同 - 所以尽管这个问题已经关闭,我认为这是一个好问题。 - Mark Butler
谢谢您提供的好解释,但我想问一件事:在actor监听事件(LowBalanceChecker)的情况下发送邮件不会在事件重放的情况下变得危险吗?如何避免这种情况-我们应该在发布事件之前发送邮件吗? - hi_my_name_is
@freakman,这并不是从业务逻辑角度来看一个完全成熟的逻辑集。它只是为了演示如何使用事件总线。如果您要发送真正的电子邮件,您很可能会进行额外的检查,以确保您不会在y时间间隔内向同一人发送相同的电子邮件超过x次(在这种情况下可能是1天1次)。 - cmbaxter
显示剩余4条评论

11

每个ActorSystem都有一个名为事件流EventBus。您可以通过调用system.eventStream来获取此EventBus

ActorSystem使用事件流进行许多操作,包括日志记录, 发送死信集群事件

您还可以使用事件流来满足自己的发布/订阅需求。例如,在测试期间,事件流可以很有用。将Test KittestActor订阅到某些事件(例如日志事件)的事件流中,然后您可以expect它们。当发生某些事件时,您不需要向另一个actor发送消息,但仍然需要在测试中期望该事件时,这可能特别有用。
请注意,事件流仅适用于一个ActorSystem。如果您正在使用在流上发布的远程事件,则默认情况下不会跨越到远程系统(尽管您可以自行添加该支持)。
理论上,如果您不想使用事件流,可以创建单独的EventBus
Akka 2.2正在编写更好的事件总线文档,因此当this ticket完成时,请再次查看。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接