有没有关于如何在Akka中使用事件总线的好教程或解释? 我已经阅读了Akka文档,但我发现很难理解如何使用事件总线。
有没有关于如何在Akka中使用事件总线的好教程或解释? 我已经阅读了Akka文档,但我发现很难理解如何使用事件总线。
不确定是否有好的教程,但我可以给你一个快速示例,说明使用事件流可能有帮助的可能用户案例。总体而言,事件流是满足应用程序可能具有的发布/订阅类型要求的有效机制。假设您有一个用例,在该用例中更新系统中用户的余额。由于经常访问余额,因此您决定将其缓存以获得更好的性能。当更新余额时,您还想检查并查看用户是否超过了他们的余额阈值,如果是,则通过电子邮件通知他们。您不希望缓存丢失或余额阈值检查直接绑定到主要余额更新调用,因为它们可能会变得繁重并减慢用户的响应速度。您可以像这样对该特定需求进行建模:
//Message and event classes
case class UpdateAccountBalance(userId:Long, amount:Long)
case class BalanceUpdated(userId:Long)
//Actor that performs account updates
class AccountManager extends Actor{
val dao = new AccountManagerDao
def receive = {
case UpdateAccountBalance(userId, amount) =>
val res = for(result <- dao.updateBalance(userId, amount)) yield{
context.system.eventStream.publish(BalanceUpdated(userId))
result
}
sender ! res
}
}
//Actor that manages a cache of account balance data
class AccountCacher extends Actor{
val cache = new AccountCache
override def preStart = {
context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
}
def receive = {
case BalanceUpdated(userId) =>
cache.remove(userId)
}
}
//Actor that checks balance after an update to warn of low balance
class LowBalanceChecker extends Actor{
val dao = new LowBalanceDao
override def preStart = {
context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
}
def receive = {
case BalanceUpdated(userId) =>
for{
balance <- dao.getBalance(userId)
theshold <- dao.getBalanceThreshold(userId)
if (balance < threshold)
}{
sendBalanceEmail(userId, balance)
}
}
}
在这个例子中,AccountCacher
和 LowBalanceChecker
这两个角色都按类类型订阅了 eventStream
的 BalanceUpdated
事件。如果该事件被发布到流中,它将被这两个角色实例接收。然后,在 AccountManager
中,当余额更新成功时,它会为用户引发一个 BalanceUpdated
事件。当这种情况发生时,在并行处理的同时,该消息会被传递到 AccountCacher
和 LowBalanceChecker
的邮箱中,导致缓存中的余额被清除,并检查账户阈值,可能会发送一封电子邮件。tell (!)
调用,但是有人可能会认为这样做会过于紧密地耦合了余额更新的这两个“副作用”,而且这些细节不一定属于 AccountManager
。 如果您有一个条件可能会导致一些其他事情发生(检查、更新等),这些事情纯粹是作为副作用发生的(不是核心业务流本身的一部分),那么事件流可能是一种很好的方法,用于解耦引发事件和可能需要对该事件做出反应的人。ActorSystem
已经为其创建了事件总线,无需自己创建。由于 ActorSystem
创建总线,我认为根 guardian 负责总线。我不确定您在第三个问题中的意思是什么,您能再解释一下吗? - cmbaxter每个ActorSystem
都有一个名为事件流的EventBus
。您可以通过调用system.eventStream
来获取此EventBus
。
ActorSystem
使用事件流进行许多操作,包括日志记录, 发送死信和集群事件。
testActor
订阅到某些事件(例如日志事件)的事件流中,然后您可以expect
它们。当发生某些事件时,您不需要向另一个actor发送消息,但仍然需要在测试中期望该事件时,这可能特别有用。ActorSystem
。如果您正在使用在流上发布的远程事件,则默认情况下不会跨越到远程系统(尽管您可以自行添加该支持)。EventBus
。