我有一个使用ProcessBuilder执行外部进程的actor:...我有几百个这样的actors并行运行...每个案例只需要几毫秒就能完成,所以同时进行非常繁重的处理。 并发处理机制按资源使用、可扩展性和性能从差到好排名如下:
- process=重量级
- thread=中等重量级(数十个线程可以在单个进程空间内执行)
- actor=轻量级(数十个演员可以通过利用单个共享线程或多个共享线程来执行)
同时启动许多进程需要大量操作系统资源——用于进程创建和终止。 在极端情况下,启动和结束进程的操作系统开销可能消耗比实际工作执行更多的CPU和内存资源。 这就是为什么创建了线程模型(更有效的actor模型)。 将当前处理视为从极易扩展的actor内部进行“类CGI”的不可扩展的操作系统负担加工处理的结果,这是一种反模式。 有些操作系统很容易被压垮:这可能正在发生。
此外,如果要读取的文件非常大,则为了可扩展性和可靠性,最好限制同时在同一磁盘上读取文件的进程数量。最多可以允许10个进程并发读取,但不确定100个进程是否可行。
演员应该如何调用外部程序?
当然,如果您将myExecutable.sh中的逻辑转换为Scala,则根本不需要创建进程。这样做更容易实现可伸缩性、性能和可靠性。
假设这不是可能或不可取的,您应该限制创建的进程总数,并在不同的Actor/请求之间重复使用它们。
第一个解决方案选项:(1)创建一个重复使用的进程池(大小为10)(2)创建演员(大小为100),通过ProcessIO与进程进行通信(3)如果所有进程都忙于处理,则演员阻塞直到有一个进程可用。这种选择的问题在于复杂性;当进程成为瓶颈时,100个演员必须执行与进程池交互的工作,而演员本身增加的价值很小。
更好的解决方案选项:(1)创建有限数量的演员(例如10个)(2)每个演员创建1个私有的长时间运行的进程(即没有池)(3)每个演员通过ProcessIO进行通信,如果进程繁忙,则阻塞。问题:仍然不够简单;演员与阻塞进程的交互效果不佳。
最佳解决方案选项:(1)没有演员,从主线程中的简单for循环将实现与演员相同的效益(2)创建有限数量的进程(10个)(3)通过for循环,使用ProcessIO依次交互每个进程(如果繁忙-阻塞或跳过下一次迭代)
“是否有任何方法设置此进程的超时时间并在超时后重试?”
确实有这个功能。actors最强大的特点之一是某些actor能够生成其他actor,同时作为它们的监督者(接收失败或超时消息,恢复/重新启动)。使用“原生scala actors”可以通过简单的编程完成,生成自己的检查和超时消息。但我不会涉及到这个,因为Akka方法更加强大且更简单。此外,下一个Scala主要版本(2.11)将采用Akka作为支持的actor模型,而“原生scala actors”已经过时
deprecated。
这里是一个示例Akka监督actor,带有自动化超时/重新启动的程序(未编译/测试)。当然,如果您选择第三种解决方案选项,这不会有什么用处:
import scala.concurrent.duration._
import scala.collection.immutable.Set
class Supervisor extends Actor {
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException => Resume
case _: NullPointerException => Restart
case _: IllegalArgumentException => Stop
case _: Exception => Escalate
}
val worker = context.actorOf(Props[Worker])
var pendingRequests = Set.empty[WorkerRequest]
def receive = {
case req: WorkRequest(sender, jobReq) =>
pendingRequests = pendingRequests + req
worker ! req
system.scheduler.scheduleOnce(10 seconds, self, WorkTimeout(req))
case resp: WorkResponse(req @ WorkRequest(sender, jobReq), jobResp) =>
pendingRequests = pendingRequests - req
sender ! resp
case timeout: WorkTimeout(req) =>
if (pendingRequests get req != None) {
worker restart
pendingRequests foreach{ worker ! _ }
}
}
}
警告:这种对于执行者监督的方法不能克服不良架构和设计。如果您从适合您需求的进程/线程/执行者设计开始,那么监督将有助于提高可靠性。但是,如果您从不良设计开始,则使用“强制恢复”来自操作系统级别故障的风险可能会加剧您的问题-使进程的可靠性变差甚至导致计算机崩溃。