Akka/Scala:映射Future vs pipeTo

15
Akka actors中,将Future结果发送给另一个actor时,通过以下方式是否存在线程数或线程锁定方面的差异:
A. 将Future映射到函数,该函数将结果告诉actor。
B. 在future上定义onSuccess回调,将结果告诉actor。
C. 使用pipeToFuture结果传送给actor。
其中一些选项在先前的问题中已经讨论过: Akka:向Actor发送未来消息 哪种是首选的方法,为什么?
此外,我想知道如果receive应该是Any => Unit类型,那么为什么在某些情况下,receive的部分函数返回Future而不是Unit,代码仍然可以编译?
这里是我提到的三个选项的代码示例:
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import akka.pattern.pipe

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Success

class ActorIncrement extends Actor {

  def receive = {
    case i: Int =>
      println(s"increment $i")
      sender ! i + 1
  }
}

class ActorEven extends Actor {

  def receive = {
    case i: Int =>
      println(s"$i is even")
  }
}


class ActorOdd extends Actor {

  def receive = {
    case i: Int =>
      println(s"$i is odd")
  }
}

class MyActor(actorIncrement: ActorRef, actorEven: ActorRef, actorOdd: ActorRef) extends Actor {
  import scala.concurrent.ExecutionContext.Implicits.global

  implicit val timeout = Timeout(5 seconds)

  def receive = {
    case i: Int if i % 2 == 0 =>
      println(s"receive a: $i")
      actorIncrement ? i map {
        case j: Int =>
          println(s"$j from increment a")
          actorOdd ! j
      }
    case i: Int =>
      println(s"receive b: $i")
      val future: Future[Any] = actorIncrement ? i
      future onSuccess {
        case i: Int =>
          println(s"$i from increment b")
          actorEven ! i
      }

    case s: String =>
      println(s"receive c: $s")
      (actorIncrement ? s.toInt).mapTo[Int] filter(_ % 2 == 0) andThen { case Success(i: Int) => println(s"$i from increment c") } pipeTo actorEven
  }
}

object TalkToActor extends App {

  // Create the 'talk-to-actor' actor system
  val system = ActorSystem("talk-to-actor")

  val actorIncrement = system.actorOf(Props[ActorIncrement], "actorIncrement")
  val actorEven = system.actorOf(Props[ActorEven], "actorEven")
  val actorOdd = system.actorOf(Props[ActorOdd], "actorOdd")

  val myActor = system.actorOf(Props(new MyActor(actorIncrement, actorEven, actorOdd)), "myActor")

  myActor ! 2
  myActor ! 7
  myActor ! "11"

  Thread.sleep(1000)

  //shutdown system
  system.terminate()
}
1个回答

17
如果您查看 akka.pattern.PipeToSupport 中如何定义pipeTo
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = 
  Actor.noSender): Future[T] = {
    future andThen {
      case Success(r) ⇒ recipient ! r
      case Failure(f) ⇒ recipient ! Status.Failure(f)
    }
  }
}

如您所见... pipeTo 不过是在将 Future 中的结果或 Status.Failure 消息发送到特定 actor 时,添加了一个 andThen 调用而已。

主要的区别在于对 Status.Failure 错误处理上。如果您不使用 pipeTo ,则可以按照自己的方式处理错误。


谢谢。你有什么想法,为什么接受 Any => Future 而期望的是 Any => Unit - rapt
2
这被称为值丢弃:https://dev59.com/zlgR5IYBdhLWcg3whdfv#41239759 - rethab
@rethab 很好。谢谢! - rapt

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接