使用Scala Actors时,如何处理阻塞操作?

7

我大约两天前开始学习scala actors框架。为了让我的想法更加具体,我决定实现一个基于TCP的回显服务器,可以处理多个同时连接。

这是回显服务器的代码(不包括错误处理):

class EchoServer extends Actor {
  private var connections = 0

  def act() {
    val serverSocket = new ServerSocket(6789)

    val echoServer = self
    actor { while (true) echoServer ! ("Connected", serverSocket.accept) }

    while (true) {
      receive {
        case ("Connected", connectionSocket: Socket) =>
          connections += 1
          (new ConnectionHandler(this, connectionSocket)).start
        case "Disconnected" =>
          connections -= 1
      }
    }
  }
}

基本上,服务器是一个Actor,处理“已连接”和“断开连接”消息。它将连接监听委托给一个匿名的Actor,该Actor调用serverSocket上的accept()方法(一个阻塞操作)。当连接到达时,它通过“已连接”消息通知服务器,并传递与新连接的客户端进行通信所使用的套接字。ConnectionHandler类的实例处理与客户端的实际通信。
以下是连接处理程序的代码(包括一些错误处理):
class ConnectionHandler(server: EchoServer, connectionSocket: Socket)
    extends Actor {

  def act() {
    for (input <- getInputStream; output <- getOutputStream) {
      val handler = self
      actor {
        var continue = true
        while (continue) {
          try {
            val req = input.readLine
            if (req != null) handler ! ("Request", req)
            else continue = false
          } catch {
            case e: IOException => continue = false
          }
        }

        handler ! "Disconnected"
      }

      var connected = true
      while (connected) {
        receive {
          case ("Request", req: String) =>
            try {
              output.writeBytes(req + "\n")
            } catch {
              case e: IOException => connected = false
            }
          case "Disconnected" =>
            connected = false
        }
      }
    }

    close()
    server ! "Disconnected"
  }

  // code for getInputStream(), getOutputStream() and close() methods
}

连接处理程序使用一个匿名的actor,通过调用socket的输入流上的readLine()方法(一个阻塞操作)等待请求被发送到socket。当接收到请求时,将向处理程序发送“Request”消息,然后简单地将请求回显到客户端。如果处理程序或匿名actor遇到底层socket问题,则关闭socket并向回声服务器发送“Disconnect”消息,表示客户端已从服务器断开连接。
因此,我可以启动回声服务器并让它等待连接。然后我可以打开一个新终端并通过telnet连接到服务器。我可以发送请求并得到正确的响应。现在,如果我打开另一个终端并连接到服务器,服务器会注册连接,但无法为这个新连接启动连接处理程序。当我通过任何现有连接之一发送消息时,我没有立即得到响应。这里有趣的部分是,当我终止除一个现有客户端连接之外的所有连接并保持客户端X打开时,然后所有通过客户端X发送的请求的响应都返回了。我做了一些测试,并得出结论,在创建连接处理程序时,act()方法不会在后续客户端连接上被调用,即使我调用start()方法。
我想我在我的连接处理程序中处理阻塞操作不正确。由于先前的连接由一个具有匿名actor阻塞等待请求的连接处理程序处理,所以我认为该阻塞的actor正在防止其他actor(连接处理程序)启动。
在使用Scala actors时,我应该如何处理阻塞操作?
任何帮助都将不胜感激。
1个回答

4

来自Scala.actors.Actor的scaladoc

注意:在调用Actor特质或其伴生对象提供之外的阻塞线程方法(例如receive)时,必须小心。在Actor内部阻塞底层线程可能导致其他Actor饥饿。这也适用于在调用receive/react之间将其线程占用时间较长的Actor。

如果Actor使用了阻塞操作(例如,阻塞I/O的方法),则有以下几个选项:

  • 可以配置运行时系统以使用更大的线程池大小(例如,通过设置actors.corePoolSize JVM属性)。
  • 可以重写Actor特质的scheduler方法,使其返回一个ResizableThreadPoolScheduler,该调度程序调整其线程池大小以避免由调用任意阻塞方法的Actor引起的饥饿问题。
  • 可以将JVM属性actors.enableForkJoin设置为false,此时默认使用ResizableThreadPoolScheduler执行Actor。

非常感谢。我设置了actors.corePoolSize JVM属性,现在我的回声服务器可以正确处理多个连接。接下来是重新设计它,使其不使用每个连接一个线程的方式。如果你有想法,我真的很想听听。 - Dwayne Crooks
如果我要开始一个类似的项目,我可能会尝试使用Naggati(http://github.com/robey/naggati/blob/master/src/main/scala/overview.html)。 - Alex Cruise
我一定会用它来获得灵感。 - Dwayne Crooks
我不确定,但我认为Akka可能具有非阻塞IO的内置支持。 - Erik Engbrecht
谢谢Erik。Akka看起来是一个非常棒的项目。我一定会尝试它。 - Dwayne Crooks

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接