使用Akka Actors处理多个TCP连接

3

我正在尝试使用akka actors建立一个简单的TCP服务器,它应该允许多个客户端同时连接。我将我的问题简化为以下简单程序:

package actorfail
import akka.actor._, akka.io._, akka.util._
import scala.collection.mutable._
import java.net._

case class Foo()

class ConnHandler(conn: ActorRef) extends Actor {
  def receive = {
    case Foo() => conn ! Tcp.Write(ByteString("foo\n"))
  }
}

class Server(conns: ArrayBuffer[ActorRef]) extends Actor {
  import context.system
  println("Listing on 127.0.0.1:9191")
  IO(Tcp) ! Tcp.Bind(self, new InetSocketAddress("127.0.0.1", 9191))
  def receive = {
    case Tcp.Connected(remote, local) =>
      val handler = context.actorOf(Props(new ConnHandler(sender)))
      sender ! Tcp.Register(handler)
      conns.append(handler)
  }
}

object Main {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("Test")
    val conns = new ArrayBuffer[ActorRef]()
    val server = system.actorOf(Props(new Server(conns)))
    while (true)  {
      println(s"Sending some foos")
      for (c <- conns) c ! Foo()
      Thread.sleep(1000)
    }
  }
}

它绑定到localhost:9191并接受多个连接,将连接处理程序添加到全局数组中,并周期性地向每个连接发送字符串"foo"。现在当我尝试同时与多个客户端连接时,只有第一个客户端会收到"foo"。当我打开第二个连接时,它不会得到任何foo,而是会得到以下类型的日志消息:
Sending some foos
[INFO] [03/27/2015 21:24:07.331] [Test-akka.actor.default-dispatcher-6] [akka://Test/deadLetters] Message [akka.io.Tcp$Write] from Actor[akka://Test/user/$a/$b#-308726290] to Actor[akka://Test/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

我理解这意味着我们试图发送Tcp.Write命令到的目标 actor 不再接受消息。但这是为什么呢?你能帮我理解其中潜在的问题吗?我该如何让它起作用?


在Actor内部使用可变状态是可以的,但是看起来你正在尝试从Actor外部访问Server Actor的连接。这不太符合Akka的风格。为什么不让Server管理连接池并在这些连接上发送消息呢? - Gangstead
@Gangstead,我刚刚测试了使用actor发送Foos的方法,似乎可以工作,请问您能否详细说明技术方面的细节或文档在哪里?我希望实际上有一种方法可以混合使用actor和其他并发模型。实际上,从外部访问actor的代码是一个循环,在阻塞方式下读取来自“Source”的行。当然,我可以将其转换为actor,但这将有点笨重,需要构建状态机。我更喜欢有一种通过ActorRef从外部调度消息的方法。 - Niklas B.
你从Akka项目的负责人那里得到了答复。他提供了一些文档链接。我认为你不能像希望的那样混合并发模型。 - Gangstead
@Gangstead 嗯,根据他的回答,我可以 :) - Niklas B.
1个回答

5
上述代码存在两个问题:
1. 在actor消息中发送可变状态并以非线程安全的方式进行修改。 2. 在Props中包含不稳定的引用。
在详细说明之前,请先阅读文档,点击这里这里,其中已经覆盖了这些内容。
可变消息:
ArrayBuffer不是线程安全的,但您将其从主例程传递给不同的actor,然后独立(并发)地修改它。这将导致更新丢失或数据结构本身损坏。另一方面,如果没有适当的同步,不能保证主线程会看到这些修改,因为编译器可以原则上确定缓冲区在while循环中不会改变,并相应地优化代码。
与其依赖于共享的可变状态,不如只发送消息。在这种情况下,解决方案是将while循环提升为actor(但在阻塞的Thread.sleep(1000)调用之后安排一个消息到self)。然后连接处理程序只需要传递此foo发送者actor的ActorRef,它们将向其发送消息以注册自己,然后该actor在其封装的范围内保留活动连接列表。这样做的好处是,您可以使用DeathWatch在终止连接时也删除连接。
Props中的不稳定引用:
有问题的代码:Props(new ConnHandler(sender))
Props是从actor工厂构建的,在这种情况下,它作为一个按名称参数获取;整个新表达式将在稍后的时间点进行评估,每当这样的actor正在初始化时——可能在不同的线程上。这意味着sender也将在稍后从该执行上下文中进行评估,并且因此它很可能是deadLetters(如果父actor当前未运行——如果正在运行,则sender可能完全指向错误的actor)。
解决方案已在此处记录。

我只在一个单一的位置修改ArrayBuffer,我认为一个actor一次只能处理一个消息?当然它仍然是有问题的,因为还有另一个actor读取数组,但那只是为了演示目的,在真正的程序中并不是这样的。你对更有趣的问题非常准确,我稍后会尝试解决方案 :) - Niklas B.
即使您在一个位置上修改了数组(您是正确的,在这种特定而狭窄的情况下是安全的),外部观察者可能会看到缓冲区处于不一致状态,甚至可能会向后“跨步”,因为参与者在线程和CPU核心之间反弹。共享可变状态不仅具有明显的问题类别;-) - Roland Kuhn
这只是一个简单的例子,在真正的程序中我不会做任何这样的事情。将props关闭在构造函数参数上起作用了,谢谢! - Niklas B.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接