如何使用Akka客户端WebSocket向WebSocket服务器发出多个请求。

3

我刚接触Akka WebSockets并正在学习Akka客户端WebSockets。参考链接:https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html

我正在为我的WebRTC Janus服务器使用WebSockets,因此我有一个URL并需要向其发送多条消息,并每次接收不同的响应,然后根据该响应发送进一步的消息。在这里,我感到困惑,因为我不知道如何实现这一点。通过查看示例代码,我认为每次需要向服务器发送消息时都需要重复以下代码,但它似乎不正确。那么正确的方法是什么?

以我的情况为例:

WebSocket服务器运行在ws://0.0.0.0:8188上。

首先,我将向服务器发送一条消息以启动会话ID。

request# 1

{
        "janus" : "create",
        "transaction" : "<random alphanumeric string>"
}

服务器将会返回会话ID。
response #1 
{
   "janus": "success",
   "session_id": 2630959283560140,
   "transaction": "asqeasd4as3d4asdasddas",
   "data": {
        "id": 4574061985075210
   }
}

然后根据id 4574061985075210,我将发送另一条消息并接收更多信息。

request # 02 {
 }

response # 02 {
}
----

如何使用Akka客户端侧的WebSockets实现此功能?
这是我的代码。
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {
  def main(args: Array[String]) = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
//suppose here based on the server response i need to send another message to the server and so on do i need to repeat this same code here again ?????   

      }

    val outgoing = Source.single(TextMessage("hello world!"))

    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))

    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

我个人更喜欢使用不同的方法,而不是直接使用akka-streams。我的方法是创建一个代表与服务器的websocket连接的actor,在那里你应该有一个代表实际ws连接到服务器的actor的引用。我还没有为客户端完成这个,但如果这个评论不够清楚,我可以尝试一下。使用actor更好,因为大多数情况下,你在websocket服务器连接之间有一个可变状态。 - AlexITC
我已经有很长时间没碰akka-streams了,另一种方法是在incoming sink之前定义“outgoing”源,以便在处理传入消息的函数上可见该源。然后,将另一个项目提供给此类源流应该很简单(它将自动发送到您的WebSocket服务器)。 - AlexITC
显然,这就是我在第一条评论中提到的内容:https://doc.akka.io/docs/akka/current/stream/operators/ActorSource/actorRef.html - AlexITC
你能否举个例子解释一下你的第一个评论,这样对我来说会更清晰和有帮助。 - swaheed
@AlexITC,你能看一下这个问题吗?https://stackoverflow.com/questions/63505847/how-to-use-flow-map-in-akka-client-side-websocket-in-akka-streams - swaheed
我认为这个例子可以满足你的需求,但我个人觉得它有点复杂:https://github.com/ticofab/akka-http-websocket-example/blob/master/src/main/scala/io/ticofab/example/AkkaHttpWSExampleApp.scala - AlexITC
1个回答

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接