如何检查Akka actor是否存在(akka 2.2)?

21

我有一个Java对象,它不是Actor,但可以使用actorSelection(Path)从actor系统中选择Actor。有可能选定的Actor在系统中不存在。

在Java API中,ActorSelection没有ask()方法,因此我不能向ActorSelection发送Identify消息并使用响应的发送者。

我尝试通过ActorSelection向Actor发送消息并通过deadletter做出反应来解决问题。但我没有收到任何deadletters。

如何使用ActorSelection检查Actor是否存活或不存在?

ActorSystem system = ActorSystem.create("test");

//create test actor
system.actorOf(Props.create(TestActor.class), "testActor");

//add dead letter listener to the system
ActorRef eventBusActor = asys.actorOf(Props.create(EventBusActor.class), "eventbusactor");
system.eventStream().subscribe(eventBusActor, DeadLetter.class);


//This works. The test actor receives the message      
ActorSelection a1 = asys.actorSelection("/user/testActor");
a1.tell("hello", ActorRef.noSender());

//This does not work and does not send dead letters      
ActorSelection a2 = asys.actorSelection("/user/doesnotexist");
a2.tell("hello", ActorRef.noSender());

//Does not compile, because ask needs an ActorRef as first argument
ActorSelection a3 = asys.actorSelection("/user/test");
Future f = Patterns.ask(a3, new Identify(), 1000);

抱歉,那是一个疏忽,感谢您指出:https://www.assembla.com/spaces/akka/simple_planner#/ticket:3532 - Roland Kuhn
5个回答

25

我最近发现了ActorSelection.resolveOne方法:

val name = "myActor"
implicit val timeout = 5000 // Timeout for the resolveOne call
system.actorSelection(name).resolveOne().onComplete {
  case Success(actor) => actor ! message

  case Failure(ex) =>
    val actor = system.actorOf(Props(classOf[ActorClass]), name)
    actor ! message
}

我目前正在调查的一个问题是,定义此方法的方式可能会被同时调用(来自其他actor)。因此,如果resolveOne调用失败,因为actor仍在创建过程中,可能会出现竞争条件,导致尝试创建两次actor。这可能对您的使用情况有或没有影响。


1
如果您正在尝试访问子操作,我猜您可以在Actor上下文中使用“def child(name: String): Option [ActorRef]”方法。这应该有助于并发问题。还请参见:https://dev59.com/8XHYa4cB1Zd3GeqPMIG- - skytteren

13

看起来Akka在java api中没有支持ActorSelection用于ask。我稍微尝试了一下代码,找到了一个可行的方法。请看看以下代码是否适用:

import java.util.concurrent.TimeUnit;

import scala.concurrent.Await;
import scala.concurrent.Future;

import akka.actor.ActorIdentity;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Identify;
import akka.actor.Props;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;

public class AskTest {

  public static void main(String[] args) throws Exception{
    ActorSystem sys = ActorSystem.apply("test");
    sys.actorOf(Props.create(TestActor.class), "mytest");

    ActorSelection sel = sys.actorSelection("/user/mytest");

    Timeout t = new Timeout(5, TimeUnit.SECONDS);
    AskableActorSelection asker = new AskableActorSelection(sel);
    Future<Object> fut = asker.ask(new Identify(1), t);
    ActorIdentity ident = (ActorIdentity)Await.result(fut, t.duration());
    ActorRef ref = ident.getRef();
    System.out.println(ref == null);
  }
}

我刚刚研究了scala的ask支持,并通过java进行了连接。这对我有用,希望对你也有帮助。


7
Akka提供了一种功能,可以使用特殊的消息IdentifyActorSelection获取ActorRef。您不必使用ask()来发送此消息。只需将一个Identify消息传递给ActorSelection,并监听将返回给您的ActorIdentity消息即可。在Akka文档中有一个完全符合此要求的示例:通过Actor Selection识别Actor (Java) 以下是修改过的示例代码:
final String identifyId = "1";

@Override
public void onReceive(Object message) {
    if (message instanceof ActorIdentity) {
        ActorIdentity identity = (ActorIdentity) message; 
        if (identity.correlationId().equals(identifyId)) {
            ActorRef ref = identity.getRef();
            if (ref == null)
                // Actor does not exist
            else {
                // Actor does exist
            }
        }
     }
}

文档中还有一张非常不错的图示,展示了ActorPath、ActorSelection和Actor Lifecycle之间的关系。


我需要从一个普通的Java对象中识别出演员,而不是从另一个演员中识别,所以我不能使用你的建议。不幸的是,无法将此对象更改为演员。 - schrums
啊,我明白了,我忽略了那个限制。也许你可以在你的Java对象内部生成一个小的辅助Actor,专门用于发送和接收Identify消息。 - Björn Jacobs
步骤如下:1)创建一个辅助Actor(这样你就有了一个ActorRef),2)使用ask()方法发送一条包含要测试的ActorSelection所有信息的消息,3)在消息到达Actor时返回结果。它可能会起作用... ;) - Björn Jacobs

4
作为其他答案中提到的,ActorSelection.resolveOne()可以处理这个问题。
需要注意的是:在内部,这个方法会向涉及到的actor发送一个消息。也就是说,如果该actor正在忙碌中,它不会回复,这将导致操作失败(超时)。
在纯粹的Akka最佳实践中,这可能只是个边角情况。但在更混合了普通Java和Akka的环境下,这很容易出现问题。特别是,位于一个actor线程之内的代码无法找到对该actor的引用。

1

使用2.3.4版本

一些Scala示例,可能会有所帮助

  val zed2 = Akka.system().actorSelection("path")
  val fs:FiniteDuration = (100).millis

  val x = zed2.resolveOne(fs).value
  if (x.isDefined){
    println(x.get.isFailure)
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接