为什么 `tokio::main` 报告错误 "cycle detected when processing"?

3
我正在使用Tokio和async/.await来创建一个UDP服务器,可以异步地接收和发送数据。
我的UDP套接字的SendHalf被多个任务共享。为了实现这一点,我使用了Arc<Mutex<SendHalf>>。这就是为什么Arc<Mutex<_>>存在的原因。
use tokio::net::UdpSocket;
use tokio::net::udp::SendHalf;
use tokio::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::net::SocketAddr;

struct Packet {
    sender: Arc<Mutex<SendHalf>>,
    buf: [u8; 512],
    addr: SocketAddr,
}

#[tokio::main]
async fn main() {
    let server = UdpSocket::bind(("0.0.0.0", 44667)).await.unwrap();
    let (mut server_rx, mut server_tx) = server.split();
    let sender = Arc::new(Mutex::new(server_tx));
    let (mut tx, mut rx) = mpsc::channel(100);

    tokio::spawn(async move {
        loop {
            let mut buffer = [0; 512];
            let (_, src) = server_rx.recv_from(&mut buffer).await.unwrap();
            let packet = Packet {
                sender: sender.clone(),
                buf: buffer,
                addr: src,
            };
            tx.send(packet).await;
        }
    });

    while let Some(packet) = rx.recv().await {
        tokio::spawn(async move {
            let mut socket = packet.sender.lock().unwrap();
            socket.send_to(&packet.buf, &packet.addr).await.unwrap();
        });
    }
}

这里也有一个沙盒环境

我遇到了一个编译器错误,但我不理解:

error[E0391]: cycle detected when processing `main`
  --> src/main.rs:13:1
   |
13 | #[tokio::main]
   | ^^^^^^^^^^^^^^
   |
note: ...which requires processing `main::{{closure}}#0::{{closure}}#1`...
  --> src/main.rs:34:33
   |
34 |           tokio::spawn(async move {
   |  _________________________________^
35 | |             let mut socket = packet.sender.lock().unwrap();
36 | |             socket.send_to(&packet.buf, &packet.addr).await.unwrap();
37 | |         });
   | |_________^
   = note: ...which again requires processing `main`, completing the cycle
note: cycle used when processing `main::{{closure}}#0`
  --> src/main.rs:13:1
   |
13 | #[tokio::main]
   | ^^^^^^^^^^^^^^

为什么我的代码会产生循环?为什么调用需要处理 main

这个错误的详细含义是什么?我想要理解发生了什么。

2个回答

5
根据 tokio 文档,在涉及从任务中使用 !Send 时:

在调用 .await 的过程中保持 !Send 值将导致类似于以下不友好的编译错误消息:

[... 某些类型 ...] 不能在线程之间安全地发送

或者:

error[E0391]: cycle detected when processing main

你正在看到这个确切的错误。当你 锁定一个 Mutex 时:
pub fn lock(&self) -> LockResult<MutexGuard<T>>

该函数返回一个MutexGuard,它是!Send的:

impl<'_, T: ?Sized> !Send for MutexGuard<'_, T>

这个可以成功编译:

#[tokio::main]
async fn main() {
    ...

    while let Some(packet) = rx.recv().await {
        let mut socket = packet.sender.lock().unwrap();
        socket.send_to(&packet.buf, &packet.addr).await.unwrap();
    }
}

这正是我想要解释错误的内容,谢谢。你的修复程序可以编译,但它也改变了行为。我不想在 while 循环中使用阻塞调用。有没有更好的方法来在任务之间共享套接字的 SendHalf - createproblem
@createproblem 为了确保大家在同一页面上,你说的是哪个调用正在阻塞? - Shepmaster
@Shepmaster socket.send_to(&packet.buf, &packet.addr).await.unwrap(); 这是阻塞的,因为 while 循环在数据发送之前不会继续处理。是吗? - createproblem
@createproblem 不,那行代码并不是阻塞的;这就是使用 futures 和 async / .await 语法的全部意义所在。在发送完成之前,该行代码后面的代码将不会执行,但其他异步任务仍然可以继续进行进展 - 没有任何阻塞 - Shepmaster
1
话虽如此,Mutex::lock是阻塞的,并且会阻止所有任务继续执行。通常情况下,在异步代码中,您希望减少使用这种类型(理想情况下降到零)。 - Shepmaster

-1

我遇到了非常相似的问题(我使用的是RwLock而不是Mutex,但结构几乎完全相同)。我想异步处理每个UDP数据包,而不是在读取另一个数据包之前等待处理,就像这里的其他解决方案一样。

我为解决问题所做的更改是使用tokio::sync::*同步结构,而不是来自std的结构。因此,在您的情况下,将std::sync::Mutex替换为tokio::sync::Mutex(并将.unwrap()替换为.await),然后您应该没问题。(文档

我已经对你的游乐场进行了更改,现在正在编译: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=2b6114a12a5f784a38a595bc521f6f02 (同时非常感谢Tokio Slack在我有同样问题时提供答案。只是传递一下)。

请注意,你的答案与我所提供的不同,并不意味着我的答案是错误的。我的回答解决了根本原因并提供了一种出路。在给我点踩前,请先看一下评论,做个基本的礼貌吧。 - edwardw
我非常尊重地不同意。原帖作者希望数据在不阻塞循环的情况下被发送。我的解决方案解决了这个问题,或者至少保持了代码的精神,而你的解决方案则没有。话虽如此,你的解决方案确实适当地回答了“为什么”的部分,这是非常有价值的。 - Mark Mandel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接