我该如何创建一个 Rust Websocket 客户端?

12

我尝试使用不同的库和实现,但我无法在Rust中获得一个可用的WebSocket客户端/监听器。

我尝试编写了一个处理程序:

extern crate ws;

use ws::{connect, listen, Handler, Sender, Handshake, Result, Message, CloseCode};

struct Client {
    out: Sender,
}

impl Handler for Client {
    fn on_open(&mut self, _: Handshake) -> Result<()> {
        self.out.send(r#"{"action": "authenticate","data": {"key_id": "<API_KEY>","secret_key": "<API_SECRET>"}}"#);
        self.out.send(r#"{"action": "listen","data": {"streams": ["AM.SPY"]}}"#)
    }

    fn on_message(&mut self, msg: Message) -> Result<()> {
        println!("message: {}", msg);
        Ok(())
    }
}

fn main() {
    if let Err(error) = listen("wss://data.alpaca.markets/stream", |out| {
        Client { out: out }
    }) {
        println!("Failed to create WebSocket due to: {:?}", error);
    }
}

我也尝试了这个:

extern crate ws;

use ws::{connect, CloseCode};

fn main() {
    if let Err(error) = connect("wss://data.alpaca.markets/stream", |out| {
        if out.send(r#"{"action": "authenticate","data": {"key_id": "<API_KEY>","secret_key": "<API_SECRET>"}}"#).is_err() {
            println!("Websocket couldn't queue an initial message.")
        } else {
            println!("Client sent message 'Hello WebSocket'. ")
        };

        if out.send(r#"{"action": "listen","data": {"streams": ["AM.SPY"]}}"#).is_err() {
            println!("Websocket couldn't queue an initial message.")
        } else {
            println!("Client sent message 'Hello WebSocket'. ")
        };

        move |msg| {
            println!("message: '{}'. ", msg);

            Ok(())
        }
    }) {
        println!("Failed to create WebSocket due to: {:?}", error);
    }
}

为确保我尝试连接的问题不是问题所在,我用JS编写了相同的代码。这是可行的。
const ws = require("ws");

const stream = new ws("wss://data.alpaca.markets/stream");

stream.on("open", () => {
    stream.send('{"action": "authenticate","data": {"key_id": "<API_KEY>","secret_key": "API_SECRET"}}');
    stream.send('{"action": "listen","data": {"streams": ["AM.SPY"]}}');
});

stream.on("message", (bar) => {
    process.stdout.write(`${bar}\n`);
});

在这两个Rust代码示例中,代码都可以编译和运行,但是on_open函数和lambda函数从未被调用。
提前感谢您。

你尝试连接到另一个 WebSocket 服务器(并发送一个简单的消息)来验证你的库是否正常工作了吗?可能存在一些特定于你正在尝试连接的服务器的问题。 - user4815162342
我曾经暂停了这个项目,但现在我已经修复好了,我使用tungstenite让它正常工作。 - Loading
3个回答

33
对于面临相同问题的任何人,我建议使用 tungstenite,对于异步Websockets,请使用 tokio-tungstenite
这是最终为我工作的代码:
use url::Url;
use tungstenite::{connect, Message};

let (mut socket, response) = connect(
    Url::parse("wss://data.alpaca.markets/stream").unwrap()
).expect("Can't connect");

socket.write_message(Message::Text(r#"{
    "action": "authenticate",
    "data": {
        "key_id": "API-KEY",
        "secret_key": "SECRET-KEY"
    }
}"#.into()));

socket.write_message(Message::Text(r#"{
    "action": "listen",
    "data": {
        "streams": ["AM.SPY"]
    }
}"#.into()));

loop {
    let msg = socket.read_message().expect("Error reading message");
    println!("Received: {}", msg);
}

而这是在Cargo.toml文件中的内容:

[dependencies]
tungstenite = {version = "0.16.0", features = ["native-tls"]}
url = "2.2.2"

我面临的问题是,我使用的方法不适用于TLS流,而是适用于TCP流。使用tungstenite时,如果启用native-tls功能,则connect方法可以正确处理TCP和TLS流。

2
谢谢您发布这篇文章,它对我非常有帮助。 - Nate Houk
嗨,正在加载中。 我只是复制了你的代码进行测试,并得到了以下结果:“thread 'main' panicked at 'Can't connect: Tls(Native(Ssl(Error { code: ErrorCode(1), cause: Some(Ssl(ErrorStack([Error { code: 336134278, library: "SSL routines", function: "ssl3_get_server_certificate", reason: "certificate verify failed", file: "s3_clnt.c", line: 1264 }]))) }, X509VerifyResult { code: 10, error: "certificate has expired" })))', src/main.rs:7:7 "。有什么问题吗? - Henry

0

tokio_tungstenite怎么样?

use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;


let strurl = format!("{}/ws?listenKey={}", WSS_URL, &listenkey);
let url = Url::parse(&strurl)?;
let (mut socket, _) = connect_async(url).await?;

let msg_tobe_send = Message::text("{\"method\":\"ping\"}").into();

loop {
  socket.send(msg_tobe_send).await?;
  let message = socket.next().await.ok_or("No response from server")??;
  println!("Received message: = {:?}", message);
}

在发送行处,出现了以下错误: 当前范围内的结构体WebSocketStream没有找到名为send的方法 只有当特征在范围内时,才能使用特征中的项

0
看一下导入的StreamExt和SinkExt traits是必需的,以便send成为一个函数。下面的代码是有效的。我想建议tokio-tungstenite的文档对于初学者来说真的很糟糕,下面的内容花了我很长时间才找到。tokio-tungstenite的开发人员,请更新你们的crate,提供更多的信息和示例。
感谢https://github.com/snapview/tokio-tungstenite/issues/137#issuecomment-732018849
use tokio::io::{AsyncWriteExt, Result};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use futures_util::{StreamExt, SinkExt};

#[tokio::main]
pub async fn main() -> Result<()> {
    println!("Hello, tokio-tungstenite!");

    let url = url::Url::parse("wss://ws.kraken.com").unwrap();

    let (ws_stream, _response) = connect_async(url).await.expect("Failed to connect");
    println!("WebSocket handshake has been successfully completed");

    let (mut write, read) = ws_stream.split();

    println!("sending");

    write.send(Message::Text(r#"{
        "event": "ping",
        "reqid": 42
      }"#.to_string()+"\n")).await.unwrap();

    println!("sent");

    let read_future = read.for_each(|message| async {
        println!("receiving...");
         let data = message.unwrap().into_data();
         tokio::io::stdout().write(&data).await.unwrap();
         println!("received...");
    });

    read_future.await;

    Ok(())
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接