在Akka Actors中如何阻塞调用

30
作为一名新手,我正在努力理解演员是如何工作的。从文档中,我认为我理解了演员是在同步模式下执行的对象,并且演员执行可以包含阻塞/同步方法调用,例如数据库请求。
但我不明白的是,如果你编写一个包含某些阻塞调用(如阻塞查询执行)的演员,它会破坏整个线程池(从CPU利用率等方面来看),对吗?我的意思是,根据我的理解,如果/当演员进行阻塞调用时,JVM无法理解它是否可以将该线程切换到其他人身上。
因此,鉴于并发性的本质,演员应该永远不会执行任何阻塞调用,这不是显而易见的吗?
如果是这样,那么完成非阻塞/异步调用的推荐方式是什么,比如调用Web服务以获取某些内容,并在请求完成时向另一个演员发送消息?我们应该像在演员内部使用以下代码一样简单地实现:
future map { response => x ! response.body }
这是处理此操作的正确方式吗?
如果您能为我澄清这些问题,我将不胜感激。

5
http://doc.akka.io/docs/akka/2.2.3/general/actor-systems.html#Blocking_Needs_Careful_Management - Viktor Klang
3个回答

17

真正伟大的介绍"The Neophyte's Guide to Scala Part 14: The Actor Approach to Concurrency" http://danielwestheide.com/blog/2013/02/27/the-neophytes-guide-to-scala-part-14-the-actor-approach-to-concurrency.html

Actor 接收消息,将阻塞代码包装到 Future 中,在它的 Future.onSuccess 方法中 - 使用其他异步消息发送结果。但要注意 sender 变量可能会改变,所以要关闭它(在 future 对象中创建一个本地引用)。

p.s.:The Neophyte's Guide to Scala - 真正很棒的书。

更新:(添加示例代码)

我们有工人和经理。经理设置要完成的工作,工人报告“收到”,并开始长时间的过程(休眠1000)。同时,系统用消息“alive”向经理发送 ping,并向工人发送 ping。完成工作时 - 工人通知经理。

NB:执行 sleep 1000 在导入的“默认/全局”线程池执行器中完成 - 您可能会遇到线程饥饿。 NB:val commander = sender 需要“关闭”对原始 sender 的引用,因为当 onSuccess 将被执行时 - actor 中的当前 sender 可能已经设置为其他 'sender' ...

日志:

01:35:12:632 Humming ...
01:35:12:633 manager: flush sent
01:35:12:633 worker: got command
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:660 worker: started
01:35:12:662 worker: alive
01:35:12:662 manager: resource allocated
01:35:12:662 worker: alive
01:35:12:662 worker: alive
01:35:13:661 worker: done
01:35:13:663 manager: work is done
01:35:17:633 Shutdown!

代码:

import akka.actor.{Props, ActorSystem, ActorRef, Actor}
import com.typesafe.config.ConfigFactory
import java.text.SimpleDateFormat
import java.util.Date
import scala.concurrent._
import ExecutionContext.Implicits.global

object Sample {

  private val fmt = new SimpleDateFormat("HH:mm:ss:SSS")

  def printWithTime(msg: String) = {
    println(fmt.format(new Date()) + " " + msg)
  }

  class WorkerActor extends Actor {
    protected def receive = {
      case "now" =>
        val commander = sender
        printWithTime("worker: got command")
        future {
          printWithTime("worker: started")
          Thread.sleep(1000)
          printWithTime("worker: done")
        }(ExecutionContext.Implicits.global) onSuccess {
          // here commander = original sender who requested the start of the future
          case _ => commander ! "done" 
        }
        commander ! "working"
      case "alive?" =>
        printWithTime("worker: alive")
    }
  }

  class ManagerActor(worker: ActorRef) extends Actor {
    protected def receive = {
      case "do" =>
        worker ! "now"
        printWithTime("manager: flush sent")
      case "working" =>
        printWithTime("manager: resource allocated")
      case "done" =>
        printWithTime("manager: work is done")
      case "alive?" =>
        printWithTime("manager alive")
        worker ! "alive?"
    }
  }

  def main(args: Array[String]) {

    val config = ConfigFactory.parseString("" +
      "akka.loglevel=DEBUG\n" +
      "akka.debug.lifecycle=on\n" +
      "akka.debug.receive=on\n" +
      "akka.debug.event-stream=on\n" +
      "akka.debug.unhandled=on\n" +
      ""
    )

    val system = ActorSystem("mine", config)
    val actor1 = system.actorOf(Props[WorkerActor], "worker")
    val actor2 = system.actorOf(Props(new ManagerActor(actor1)), "manager")

    actor2 ! "do"
    actor2 ! "alive?"
    actor2 ! "alive?"
    actor2 ! "alive?"

    printWithTime("Humming ...")
    Thread.sleep(5000)
    printWithTime("Shutdown!")
    system.shutdown()

  }
}

只是一个快速的问题,即使您在Future中关闭了一个变量,它可能需要一些时间才能进入该Future代码(例如,在线程饥饿的情况下)并关闭该变量。如果actor接收到另一条消息并在future有机会关闭该变量之前更新了已关闭的变量,那怎么办? - Aysu Dogma
闭包中的close,不是像inputStream.close()那样的关闭。 - ya_pulser
啊,我明白了。我不明白的是对象作用域和消息处理程序作用域之间的区别,你的示例让这一点非常清楚。谢谢... - Aysu Dogma
与上面的解决方案相比,我更倾向于将阻塞代码包装在“Future”中,并在自定义线程池中运行。这样做,我可以更好地控制线程数量,并且如果请求更多的阻塞调用并且阻塞调用需要更长时间(> 1秒),它也不会导致饥饿。当然,自定义线程池具有合理的大小来管理阻塞调用的工作量。我的想法有什么遗漏吗? - peterschrott
1
@peedeeX21 我使用了和你一模一样的方法: "... case "now" => future { Thread.sleep(1000) }(ExecutionContext.Implicits.global) ...",但是在这个例子中我懒得重新创建线程了,应该像你建议的那样,所有耗时的工作都应该在单独的线程上运行。 - ya_pulser
显示剩余2条评论

17

这真的取决于使用情况。如果查询不需要串行化,则可以在未来执行查询并将结果发送回发送者,方法如下:

import scala.concurrent.{ future, blocking}
import akka.pattern.pipe

val resFut = future {
  blocking {
    executeQuery()
  }
}

resFut pipeTo sender

你也可以创建一个专门用于处理数据库调用的调度程序,并使用路由器来创建Actor。 这样,您还可以轻松地限制并发数据库请求的数量。


阻塞代码是否受执行上下文中线程数量的限制? - Jas
5
使用“blocking”来包围代码允许运行时调整其行为,并将阻塞线程从线程池中移除,以允许线程池启动新线程,从而防止线程池耗尽。 - drexin
这是 pipeTo 的预期用法吗?我能找到的唯一文档说,当您需要发送 akka ask 的结果时应使用它。但是,使用(非 akka)阻塞操作肯定会耗尽默认调度程序的线程池,对吗? - Matt
pipeTo 在 future 上注册一个回调,因此它不会阻塞线程。 - drexin

1

如果你考虑在Akka中进行阻塞调用,那么考虑线程池是正确的。你进行的阻塞越多,你需要的线程池就越大。一个完全非阻塞的系统只需要一个与你的机器CPU核心数相等的线程池。参考配置使用了一个线程池,它的大小是机器CPU核心数的3倍,以便处理一些阻塞情况:

    # The core pool size factor is used to determine thread pool core size
    # using the following formula: ceil(available processors * factor).
    # Resulting size is then bounded by the core-pool-size-min and
    # core-pool-size-max values.
    core-pool-size-factor = 3.0

源代码

但如果您需要进行更多的阻塞操作,或者为阻塞调用创建一个具有更高fork-join-executor.core-pool-size-factor的非默认分发程序,则可能需要将akka.default-dispatcher.fork-join-executor.core-pool-size-factor增加到更高的数字。

关于在Akka中进行阻塞调用的最佳方法。我建议通过创建执行阻塞调用的多个actor实例并在它们前面放置路由器来扩展它们,以使它们对您应用程序的其余部分看起来像单个actor。


1
我建议使用单独的线程池来执行阻塞IO操作。它们可以像上面的示例一样包装在Future中。 - binarytemple_picsolve

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接