Akka:动态添加Actor到BroadcastRouter

3

如何正确地使广播Actor能够动态添加/移除路由器?

问题背景: 一个Actor监听特定商品价格的变化, 然后将价格变化广播给所有其他的Actor(路由器),它们根据内部规则进行操作(例如如果价格为X则买入或卖出)。

我对Akka还很陌生,但是在阅读了文档之后,我相信我已经找出了所需的组件,但如果您认为我的设计或使用的组件不正确,请评论或回答。

我想从固定的路由器列表切换到可动态添加/移除路由器的方案。

        ActorRef actor1 = system.actorOf(new Props(LimitOrderActor.class));
        ActorRef actor2 = system.actorOf(new Props(LimitOrderActor.class));
        ActorRef actor3 = system.actorOf(new Props(LimitOrderActor.class));
        Iterable<ActorRef> routees = Arrays.asList(new ActorRef[] { actor1, actor2, actor3 });
        ActorRef actorBroadcastRouter1 = system.actorOf(new Props(TickerWatcherActor.class).withRouter(BroadcastRouter.create(routees)), "router1");

将其转换为动态大小的BroadcastRouter,其中actor是在BroadcastRouter启动后创建的。

        int lowerBound = 1;
        int upperBound = 10000;
        DefaultResizer resizer = new DefaultResizer(lowerBound, upperBound);
        BroadcastRouter broadcastRouter2 = new BroadcastRouter(resizer);
        ActorRef actorBroadcastRouter2 = system.actorOf(new Props(TickerWatcherActor.class).withRouter(broadcastRouter2), "router2");

        ActorRef actor4 = system.actorOf(new Props(LimitOrderActor.class).withRouter((RouterConfig) broadcastRouter2));
        ActorRef actor5 = system.actorOf(new Props(LimitOrderActor.class).withRouter((RouterConfig) broadcastRouter2));
        ActorRef actor6 = system.actorOf(new Props(LimitOrderActor.class).withRouter((RouterConfig) broadcastRouter2));

目前,演员“actorBroadcastRouter2”正在接收消息,而不是预期的LimitOrderActor演员4、5和6。我做错了什么?

编辑:我相信现在我要寻找的是事件总线而不是广播路由器。

   final ActorRef actor = system.actorOf(new Props(LimitOrderActor.class));
   system.eventStream().subscribe(actor, String.class);
2个回答

0
使用事件总线而不是广播路由器:
final ActorRef actor = system.actorOf(new Props(LimitOrderActor.class));
system.eventStream().subscribe(actor, String.class);

0

让我们关闭这个问题,我也曾遇到同样的问题。这不是广播路由器的工作方式。您应该在actor4actor5actor6中创建新的广播路由器,而不是添加到actorBroadcastRouter2中现有的广播路由器。

在Scala中,使用Listeners trait非常简单,但很遗憾,在使用Java编写Akka时无法使用它:

class MyActor extends Actor with Listeners {
  def receive = {
     case yourmessages => gossip("To All my listeners")
  } orElse listenerManagement
}

val myActor = context.actorOf(Props[MyActor])

myActor ! Listen(someActorRef)

myActor ! "pigdog"

必须编写自己的Java版本或使用DistributedPubSubExtension进行更全面的操作。

https://groups.google.com/forum/?fromgroups=#!topic/akka-user/NEOY9IxRW5I

https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/routing/Listeners.scala

http://doc.akka.io/docs/akka/snapshot/contrib/distributed-pub-sub.html#a-small-example-in-java


我使用EventStream编写了自己的Java版本。DistributedPubSubExtension在当前的2.1.2稳定版本中已经可用吗?我认为它仍在开发中。 - André Ricardo
我没有使用DistributedPubSubExtension,所以无法确定。这是因为它会将订阅信息复制到每个节点,但我更需要对订阅信息进行分片。据我所知,开箱即用的死节点检测功能是不可用的。 - Georg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接