Akka TCP命令失败

4

我正在使用Akka Tcp编写客户端和服务器应用程序,但遇到了高吞吐量的问题。 当我在客户端写入过多消息时,会出现太多的CommandFailed消息,我无法弄清原因… 这是我的服务器:

class Server(listener: ActorRef) extends Actor {

  import Tcp._
  import context.system

  IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 9090))

  def receive = {
    case CommandFailed(_: Bind) => {
      println("command failed error")
      context stop self
    }

    case c@Tcp.Connected(remote, local) =>
      listener ! GatlingConnected(c.remoteAddress.toString)
      println("Connected: " + c.remoteAddress)
      val handler = context.actorOf(Props(classOf[ServerHandler], listener, c.remoteAddress.toString))
      val connection = sender
      connection ! Register(handler)
  }
}

class ServerHandler(listener: ActorRef, remote: String) extends Actor {

  import Tcp._

  override def receive: Receive = {
    case Received(data) => listener ! data.utf8String
    case PeerClosed => {
      listener ! Finished(remote)
      context stop self
    }
  }
}

Message和Finished只是我创建的案例类。这是客户端代码(我认为这是问题的根源):

private class TCPMessageSender(listener: ActorRef) extends BaseActor {
    final val MESSAGE_DELIMITER = "\n"
    val buffer = new ListBuffer[Any]
    val failedMessages = new ListBuffer[Write]
    IO(Tcp) ! Connect(new InetSocketAddress(configuration.data.tcp.host, configuration.data.tcp.port))

    override def receive = {
      case msg @ (_: UserMessage | _: GroupMessage | _: RequestMessage) =>
        logger.warn(s"Received message ($msg) before connected. Buffering...")
        buffer += msg
      case CommandFailed(_: Connect) =>
        logger.warn("Can't connect. All messages will be ignored")
        listener ! Terminate
        context stop self
      case c @ Connected(remote, local) =>
        logger.info("Connected to " + c.remoteAddress)
        val connection = sender
        connection ! Register(self)
        logger.info("Sending previous received messages: " + buffer.size)
        buffer.foreach(msg => {
          val msgString: String = JsonHelper.toJson(Map[String, Any]("message_type" -> msg.getClass.getSimpleName, "message" -> msg))
          connection ! Write(ByteString(msgString + MESSAGE_DELIMITER))
        })
        buffer.clear
        logger.info("Sent")
        context become {
          case msg @ (_: UserMessage | _: GroupMessage | _: RequestMessage) =>
            val msgString: String = JsonHelper.toJson(Map[String, Any]("message_type" -> msg.getClass.getSimpleName, "message" -> msg))
            logger.trace(s"Sending message: $msgString")
            connection ! Write(ByteString(msgString + MESSAGE_DELIMITER))
          case CommandFailed(w: Write) =>
            logger.error("Command failed. Buffering message...")
            failedMessages += w
            connection ! ResumeWriting
          case CommandFailed(c) => logger.error(s"Command $c failed. I don't know what to do...")
          case Received(data) =>
            logger.warn(s"I am not supposed to receive this data: $data")
          case "close" =>
            connection ! Close
          case _: ConnectionClosed =>
            logger.info("Connection closed")
            context stop self
          case WritingResumed => {
            logger.info("Sending failed messages")
            failedMessages.foreach(write => connection ! write)
            failedMessages.clear
          }
        }
    }
  }

有时我会收到很多CommandFailed消息,我调用ResumeWrite却从未收到WritingResumed消息(在这种情况下连接也从未关闭)。我做错了什么吗?

你找到任何解决方案了吗? - onur taskin
很遗憾,我从来没有能够解决这个问题,在我找到其他替代方案之前该项目已经被中止了 :( - Augusto
1个回答

2

我认为问题在于当您发送注册消息注册您的actor时,还必须将useResumeWriting参数设置为true:

connection ! Register(handler, false, true)

简历撰写命令文档如下所示:
When `useResumeWriting` is in effect as was indicated in the [[Tcp.Register]] message
then this command needs to be sent to the connection actor in order to re-enable
writing after a [[Tcp.CommandFailed]] event. All [[Tcp.WriteCommand]] processed by the
connection actor between the first [[Tcp.CommandFailed]] and subsequent reception of
this message will also be rejected with [[Tcp.CommandFailed]].

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接