Scala akka-http WebSocket:如何保存客户端连接并在需要时向客户端推送消息?

6

如何将客户端(Web)连接保存在内存变量中,然后在需要时向客户端(Web)发送出站消息?

我已经有一些简单的代码,可以在服务器接收到客户端消息后将消息推送回客户端。如何修改下面的代码以实现出站消息部分?

implicit val actorSystem = ActorSystem("akka-system")
implicit val flowMaterializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher

val ip = "127.0.0.1"
val port = 32000

val route = get {
    pathEndOrSingleSlash {
        complete("Welcome to websocket server")
    }
} ~
path("hello") {
    get {
        handleWebSocketMessages(echoService)
    }
}

def sendMessageToClient(msg : String) {

    // *** How to implement this?
    // *** How to save the client connection when it is first connected?
    //     Then how to send message to this connection?

}

val echoService = Flow[Message].collect {

    // *** Here the server push back messages when receiving msg from client

    case tm : TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream)
    case _ => TextMessage("Message type unsupported")
}

val binding = Http().bindAndHandle(route, ip, port)

2
这篇博客可能会有用:https://markatta.com/codemonkey/blog/2016/04/18/chat-with-akka-http-websockets/ - johanandren
1
我有同样的问题。 - Rob O'Doherty
1个回答

1
你可以通过使用.map调用来查看管道的流程。在.map调用内部,你可以捕获值,然后返回相同的消息。例如:

  Flow[Message].collect {
    case tm : TextMessage =>
      TextMessage(Source.single("Hello ") ++ tm.textStream.via(
        Flow[String].map((message) => {println(message) /* capture value here*/; message})))
    case _ => TextMessage("Message type unsupported")
  }

现在,如果您的意图是处理这些值并稍后发送值,则您需要的不是单个源到汇流,而是两个分别用于汇和源的流,您可以使用Flow.fromSinkAndSource
Flow.fromSinkAndSource[Message, Message](
  Flow[Message].collect { /* capture values */},
    // Or send stream to other sink for more processing
  source
)

很可能,此源将由图形DSL构建,手动编写的actor,或者您可以查看使用可重用的helpers,例如MergeHub

我知道这是一个旧答案,但我在理解你的解决方案时遇到了麻烦。难道你不只是捕获消息值吗?在你的代码中处理后如何发送答案呢? - Florian Baierl

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接