我大约两天前开始学习scala actors框架。为了让我的想法更加具体,我决定实现一个基于TCP的回显服务器,可以处理多个同时连接。
这是回显服务器的代码(不包括错误处理):
class EchoServer extends Actor {
private var connections = 0
def act() {
val serverSocket = new ServerSocket(6789)
val echoServer = self
actor { while (true) echoServer ! ("Connected", serverSocket.accept) }
while (true) {
receive {
case ("Connected", connectionSocket: Socket) =>
connections += 1
(new ConnectionHandler(this, connectionSocket)).start
case "Disconnected" =>
connections -= 1
}
}
}
}
基本上,服务器是一个Actor,处理“已连接”和“断开连接”消息。它将连接监听委托给一个匿名的Actor,该Actor调用serverSocket上的accept()方法(一个阻塞操作)。当连接到达时,它通过“已连接”消息通知服务器,并传递与新连接的客户端进行通信所使用的套接字。ConnectionHandler类的实例处理与客户端的实际通信。
以下是连接处理程序的代码(包括一些错误处理):
class ConnectionHandler(server: EchoServer, connectionSocket: Socket)
extends Actor {
def act() {
for (input <- getInputStream; output <- getOutputStream) {
val handler = self
actor {
var continue = true
while (continue) {
try {
val req = input.readLine
if (req != null) handler ! ("Request", req)
else continue = false
} catch {
case e: IOException => continue = false
}
}
handler ! "Disconnected"
}
var connected = true
while (connected) {
receive {
case ("Request", req: String) =>
try {
output.writeBytes(req + "\n")
} catch {
case e: IOException => connected = false
}
case "Disconnected" =>
connected = false
}
}
}
close()
server ! "Disconnected"
}
// code for getInputStream(), getOutputStream() and close() methods
}
连接处理程序使用一个匿名的actor,通过调用socket的输入流上的readLine()方法(一个阻塞操作)等待请求被发送到socket。当接收到请求时,将向处理程序发送“Request”消息,然后简单地将请求回显到客户端。如果处理程序或匿名actor遇到底层socket问题,则关闭socket并向回声服务器发送“Disconnect”消息,表示客户端已从服务器断开连接。
因此,我可以启动回声服务器并让它等待连接。然后我可以打开一个新终端并通过telnet连接到服务器。我可以发送请求并得到正确的响应。现在,如果我打开另一个终端并连接到服务器,服务器会注册连接,但无法为这个新连接启动连接处理程序。当我通过任何现有连接之一发送消息时,我没有立即得到响应。这里有趣的部分是,当我终止除一个现有客户端连接之外的所有连接并保持客户端X打开时,然后所有通过客户端X发送的请求的响应都返回了。我做了一些测试,并得出结论,在创建连接处理程序时,act()方法不会在后续客户端连接上被调用,即使我调用start()方法。
我想我在我的连接处理程序中处理阻塞操作不正确。由于先前的连接由一个具有匿名actor阻塞等待请求的连接处理程序处理,所以我认为该阻塞的actor正在防止其他actor(连接处理程序)启动。
在使用Scala actors时,我应该如何处理阻塞操作?
任何帮助都将不胜感激。