我正在使用Tokio和
我的UDP套接字的
async
/.await
来创建一个UDP服务器,可以异步地接收和发送数据。我的UDP套接字的
SendHalf
被多个任务共享。为了实现这一点,我使用了Arc<Mutex<SendHalf>>
。这就是为什么Arc<Mutex<_>>
存在的原因。use tokio::net::UdpSocket;
use tokio::net::udp::SendHalf;
use tokio::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::net::SocketAddr;
struct Packet {
sender: Arc<Mutex<SendHalf>>,
buf: [u8; 512],
addr: SocketAddr,
}
#[tokio::main]
async fn main() {
let server = UdpSocket::bind(("0.0.0.0", 44667)).await.unwrap();
let (mut server_rx, mut server_tx) = server.split();
let sender = Arc::new(Mutex::new(server_tx));
let (mut tx, mut rx) = mpsc::channel(100);
tokio::spawn(async move {
loop {
let mut buffer = [0; 512];
let (_, src) = server_rx.recv_from(&mut buffer).await.unwrap();
let packet = Packet {
sender: sender.clone(),
buf: buffer,
addr: src,
};
tx.send(packet).await;
}
});
while let Some(packet) = rx.recv().await {
tokio::spawn(async move {
let mut socket = packet.sender.lock().unwrap();
socket.send_to(&packet.buf, &packet.addr).await.unwrap();
});
}
}
这里也有一个沙盒环境。
我遇到了一个编译器错误,但我不理解:
error[E0391]: cycle detected when processing `main`
--> src/main.rs:13:1
|
13 | #[tokio::main]
| ^^^^^^^^^^^^^^
|
note: ...which requires processing `main::{{closure}}#0::{{closure}}#1`...
--> src/main.rs:34:33
|
34 | tokio::spawn(async move {
| _________________________________^
35 | | let mut socket = packet.sender.lock().unwrap();
36 | | socket.send_to(&packet.buf, &packet.addr).await.unwrap();
37 | | });
| |_________^
= note: ...which again requires processing `main`, completing the cycle
note: cycle used when processing `main::{{closure}}#0`
--> src/main.rs:13:1
|
13 | #[tokio::main]
| ^^^^^^^^^^^^^^
为什么我的代码会产生循环?为什么调用需要处理 main
?
这个错误的详细含义是什么?我想要理解发生了什么。
SendHalf
? - createproblemsocket.send_to(&packet.buf, &packet.addr).await.unwrap();
这是阻塞的,因为 while 循环在数据发送之前不会继续处理。是吗? - createproblemasync
/.await
语法的全部意义所在。在发送完成之前,该行代码后面的代码将不会执行,但其他异步任务仍然可以继续进行进展 - 没有任何阻塞。 - ShepmasterMutex::lock
是阻塞的,并且会阻止所有任务继续执行。通常情况下,在异步代码中,您希望减少使用这种类型(理想情况下降到零)。 - Shepmaster