Scala系统进程挂起

3

我有一个演员使用ProcessBuilder来执行外部进程:

  def act {
    while (true) {
      receive {
        case param: String => {
           val filePaths = Seq("/tmp/file1","/tmp/file2")
           val fileList = new ByteArrayInputStream(filePaths.mkString("\n").getBytes())
           val output = s"myExecutable.sh ${param}" #< fileList !!<

           doSomethingWith(output)
        }
      }
    }
  }

我同时运行着数百个actor。有时候,由于未知原因,进程的执行永远不会返回。它会一直挂起。这个特定的actor无法处理新消息。是否有办法设置一个超时时间来使进程返回,并在超时后进行重试?
为什么这些命令不能持续几毫秒以上?这可能是执行被挂起的原因。
编辑1: 我观察到两个重要事实: 1. 这个问题只出现在Linux上,而不是Max OS X。 2. 当我不使用ByteArrayInputStream作为执行的输入时,程序就不会挂起。

https://issues.scala-lang.org/browse/SI-8406? - som-snytt
4个回答

3
我有一个使用ProcessBuilder执行外部进程的actor:...我有几百个这样的actors并行运行...每个案例只需要几毫秒就能完成,所以同时进行非常繁重的处理。 并发处理机制按资源使用、可扩展性和性能从差到好排名如下:
  1. process=重量级
  2. thread=中等重量级(数十个线程可以在单个进程空间内执行)
  3. actor=轻量级(数十个演员可以通过利用单个共享线程或多个共享线程来执行)
同时启动许多进程需要大量操作系统资源——用于进程创建和终止。 在极端情况下,启动和结束进程的操作系统开销可能消耗比实际工作执行更多的CPU和内存资源。 这就是为什么创建了线程模型(更有效的actor模型)。 将当前处理视为从极易扩展的actor内部进行“类CGI”的不可扩展的操作系统负担加工处理的结果,这是一种反模式。 有些操作系统很容易被压垮:这可能正在发生。
此外,如果要读取的文件非常大,则为了可扩展性和可靠性,最好限制同时在同一磁盘上读取文件的进程数量。最多可以允许10个进程并发读取,但不确定100个进程是否可行。
演员应该如何调用外部程序?
当然,如果您将myExecutable.sh中的逻辑转换为Scala,则根本不需要创建进程。这样做更容易实现可伸缩性、性能和可靠性。
假设这不是可能或不可取的,您应该限制创建的进程总数,并在不同的Actor/请求之间重复使用它们。
第一个解决方案选项:(1)创建一个重复使用的进程池(大小为10)(2)创建演员(大小为100),通过ProcessIO与进程进行通信(3)如果所有进程都忙于处理,则演员阻塞直到有一个进程可用。这种选择的问题在于复杂性;当进程成为瓶颈时,100个演员必须执行与进程池交互的工作,而演员本身增加的价值很小。
更好的解决方案选项:(1)创建有限数量的演员(例如10个)(2)每个演员创建1个私有的长时间运行的进程(即没有池)(3)每个演员通过ProcessIO进行通信,如果进程繁忙,则阻塞。问题:仍然不够简单;演员与阻塞进程的交互效果不佳。
最佳解决方案选项:(1)没有演员,从主线程中的简单for循环将实现与演员相同的效益(2)创建有限数量的进程(10个)(3)通过for循环,使用ProcessIO依次交互每个进程(如果繁忙-阻塞或跳过下一次迭代)
“是否有任何方法设置此进程的超时时间并在超时后重试?”
确实有这个功能。actors最强大的特点之一是某些actor能够生成其他actor,同时作为它们的监督者(接收失败或超时消息,恢复/重新启动)。使用“原生scala actors”可以通过简单的编程完成,生成自己的检查和超时消息。但我不会涉及到这个,因为Akka方法更加强大且更简单。此外,下一个Scala主要版本(2.11)将采用Akka作为支持的actor模型,而“原生scala actors”已经过时deprecated

这里是一个示例Akka监督actor,带有自动化超时/重新启动的程序(未编译/测试)。当然,如果您选择第三种解决方案选项,这不会有什么用处:

import scala.concurrent.duration._
import scala.collection.immutable.Set

class Supervisor extends Actor {
  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException => Resume     // resumes (reuses) all child actors
      case _: NullPointerException => Restart   // restarts all child actors
      case _: IllegalArgumentException => Stop  // terminates this actor & all children
      case _: Exception => Escalate             // supervisor to receive exception
    }

  val worker = context.actorOf(Props[Worker])  // creates a supervised child actor
  var pendingRequests = Set.empty[WorkerRequest]

  def receive = {
    case req: WorkRequest(sender, jobReq) => 
      pendingRequests = pendingRequests + req
      worker ! req
      system.scheduler.scheduleOnce(10 seconds, self, WorkTimeout(req))
    case resp: WorkResponse(req @ WorkRequest(sender, jobReq), jobResp) => 
      pendingRequests = pendingRequests - req
      sender ! resp
    case timeout: WorkTimeout(req) =>
      if (pendingRequests get req != None) {
        // restart the unresponsive worker
        worker restart
        // resend all pending requests
        pendingRequests foreach{ worker ! _ }
      }
  }
}

警告:这种对于执行者监督的方法不能克服不良架构和设计。如果您从适合您需求的进程/线程/执行者设计开始,那么监督将有助于提高可靠性。但是,如果您从不良设计开始,则使用“强制恢复”来自操作系统级别故障的风险可能会加剧您的问题-使进程的可靠性变差甚至导致计算机崩溃。

1
我没有足够的信息来重现这个问题,所以我无法确切地诊断它,但是如果我站在你的角度,这就是我如何诊断它的基本方法是差异性诊断-确定可能的原因和证明或排除它们的测试。
我要做的第一件事是验证应用程序生成的myExecutable.sh进程是否真正终止。
如果进程没有终止,则这是问题的一部分,因此我们需要了解原因。我们可以做的一件事是运行其他东西而不是myExecutable.sh。您建议ByteArrayInputStream可能是问题的一部分,这表明myExecutable.shstdin上获得了错误的输入。如果是这种情况,那么您可以运行一个简单地将其输入记录到文件中的脚本,这将显示出来。如果输入无效,则ByteArrayInputStream由于某种原因提供了错误的数据-线程安全性和Unicode是明显的罪魁祸首,但查看实际的错误数据应该会给您提示。如果输入有效,则是myExecutable.sh中的一个错误。
如果进程正在终止,那么问题就出在其他地方。我的第一个猜测是它要么与演员调度有关(演员库通常使用ForkJoin进行执行,这很好,但不适用于阻塞代码),要么是scala.sys.process库中的错误(这并非没有先例 - 我曾经因为内存泄漏而不得不放弃我正在处理的项目中的scala.sys.process)。
查看挂起线程的堆栈跟踪应该会给你一些线索(VisualVM是你的朋友),因为你应该能够看到正在等待什么。然后,您可以在OpenJDK或Scala标准库源代码中找到相关代码。接下来的步骤取决于您找到了什么。

0

你能不能将这个进程及其处理放在未来执行,并对其进行定时等待?


我试图在理论上做到这一点,它应该可以工作。问题是我需要为每个future添加一个额外的线程。这会在我的应用程序中引起另一个问题。我已经为此创建了一个问题:http://stackoverflow.com/questions/22187186/scala-futures-creating-infinity-threads-in-actors - Daniel Cukier

0

我认为如果不知道myExecutable.sh或doSomethingWith,我们无法解决它。

当它挂起时,请尝试杀死所有的myExecutable.sh进程。

  • 如果有帮助,您应该检查myExecutable.sh。
  • 如果没有帮助,您应该检查doSomethingWith函数。

可执行文件为echoprint-codegen(https://github.com/echonest/echoprint-codegen)。它有两种工作方式: 1)将文件作为参数接收并处理文件 2)接收文件列表作为输入流并对其进行处理。正如编辑1中所述,第一种方法不会挂起,而第二种方法偶尔会挂起。 - Daniel Cukier
也许我误解了您使用ByteArrayInputStream的意思。我以为您没有为外部进程提供任何输入,这实质上可能意味着外部进程的行为会有所不同。这样做既不能确认也不能否认外部进程中存在问题。如果您通过不使用BAIS来表达不同的意思,那么您具体指的是什么? - v6ak
有两种执行外部进程的方式,两者结果相同:一种是使用文件名参数,另一种是使用输入流,其中包含一个单一文件名的列表(实际上,外部程序接受文件名列表,但在我的情况下,我只想处理一个)。 - Daniel Cukier

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接