如何在不阻塞的情况下使用Akka ask模式

3

你好,我有一个演员负责从数据库中获取数据,将其转换为列表并将其发送回发件人。我使用ask模式接收来自我的演员的响应,因为我不想使用await.result,因为这种方法会阻塞线程,这是不好的。

我编写了以下代码来从我的演员获得响应(我跳过了db代码以保持简单,但逻辑仍然相同):

class MyActor extends Actor{
   var list = new MutableList[Int]()
   list+=1
   list+=2
   list+=3
   list+=4

   def receive ={
     case "returnAlist"=>
       println("sending back list "+list +"of size "+list.size)
       sender ! list
     case message => 
       unhandled(message)
   }
}

object Test extends App{
  val system = ActorSystem("testing")
  val MyActor = system.actorOf(Props[MyActor], name = "InstitutionUserSuggestion")
  implicit val timeout = Timeout(15 seconds)
  var ResultList = new MutableList[Int]()
  val future:Future[MutableList[Int]] = ask(MyActor,"returnAlist").mapTo[MutableList[Int]] 
  future.onComplete { 
    case Success(result)=>
      println(" in sucees start")
      println("value of result "+ result.size)
      println("ResultList=result")
      ResultList = result
      println("value of ResultList "+ ResultList.size)
      println("in Success end")
    case Failure(e)=>
      println(" in failure")
      e.printStackTrace()
  }

  println("returned list size is " + ResultList.size + " and its contents" + ResultList)


}

以下是上述代码的输出结果:
sending back list MutableList(1, 2, 3, 4)of size 4
returned list size is 0 and its contenstsMutableList()
in sucees start
value of result 4
 ResultList=result
value of ResultList 4
 in Success end

我的问题是onComplete代码在结束时执行,我需要将我的列表ResultList的内容显示在控制台上,但是这行代码
println("returned list size is "+ResultList.size +" and its contensts" +ResultList)

我在onComplete块之后编写的代码,但我认为它在onComplete块之前执行,因此最终ResultList列表中没有任何内容,因此我无法将其项目打印到控制台。

请帮帮我,我该怎么做才能在不阻塞的情况下从actor接收项目列表,并在控制台上显示它们。

1个回答

2

您不希望通过等待演员的响应而阻塞,因此您正在正确使用Future。当您的演员回复列表时,onComplete函数中的代码将被执行。

由于您不想阻塞并以异步方式处理它,因此您的最后一个println语句在您的演员尚未响应时执行。

您可以在onComplete块中打印列表的内容:

object Test extends App{
  val system = ActorSystem("testing")
  val MyActor = system.actorOf(Props[MyActor], name = "InstitutionUserSuggestion")
  implicit val timeout = Timeout(15 seconds)

  val future: Future[List[Int]] = ask(MyActor,"returnAlist").mapTo[List[Int]] 
  future.onComplete { 
    case Success(result)=>
      println("returned list size is " + result.size +" and its contents" + result)
    case Failure(e)=>
      println("in failure")
      e.printStackTrace()
  }
}

作为一个附注,我使用了不可变的List,这比使用可变集合,如MutableList更符合Scala的惯用法。

但是如果我想在onComplete块之后打印列表的内容,我该怎么办? - swaheed
这是不可能的,除非使用Await.result。你可以将它写在Future.onComplete块中,或者使用Await.result来阻塞并等待结果,然后再打印它。 - Peter Neyens
好的,谢谢。最后一个问题,我正在使用onComplete的ask模式,但仍然需要提供超时,为什么会这样? - swaheed
因为不想永远等待演员的响应。如果演员在隐式的 timeout 内没有响应,那么 Future 将变成一个 Failure。更多信息请参见 akka 文档 - Peter Neyens
那么,即使设置了15秒的超时时间,我的代码也不会阻塞,对吗? - swaheed
让我们在聊天中继续这个讨论 - Peter Neyens

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接