如何处理tokio::spawn需要'static和&self闭包的问题?

5
我对如何编写并发异步代码封装在一个单一结构中感到困惑。
我不确定如何准确地解释这个问题,所以我将尝试通过一个例子来说明。
假设我有一个名为 `UdpServer` 的结构体。该结构体有多个与其行为相关的方法(例如 `handle_datagram`、`deserialize_datagram` 等)。 如果要使代码并发,我将生成一个 tokio 任务,这需要提供给它的闭包是静态的,这意味着我不能从此任务中调用 `&self`,只要 `&self` 不是静态的,这意味着我无法调用 `self.serialize_datagram()`。
我理解这个问题(没有保证结构体会比线程更长时间存在),但是找不到适当的解决方法。我知道可以将函数移出 impl,但这对我来说不是一个好的解决方法。 此外,即使我们暂时假设我可以将`&self`作为静态参数,对我来说,这段代码看起来仍然不正确(可能是因为不够 Rusty)。 另一种“解决方案”是使用 `self: Arc` 代替 `&self`,但这甚至更糟糕。
因此,我假设我不知道的某种模式。 谁能向我解释应该如何重构整个代码?
示例代码:
struct UdpServer {}
impl UdpServer {
    pub async fn run(&self) {
        let socket = UdpSocket::bind(self.addr).await.unwrap();
        loop {
            let mut buf: &mut [u8] = &mut [];
            let (_, _) = socket.recv_from(&mut buf).await.unwrap();

            // I spawn tokio task to enable concurrency
            tokio::spawn(async move {
                // But i can't use &self in here because it's not static.
                let datagram = self.deserialize_datagram(buf).await;
                self.handle_datagram(()).await;
            });
        }
    }

    pub async fn deserialize_datagram(&self, buf: &mut [u8]) -> Datagram {
        unimplemented!()
    }

    pub async fn handle_datagram(&self, datagram: Datagram) {
        unimplemented!()
    }
}

另一个“解决方案”是采用self: Arc<Self>而不是&self,但这种方式甚至更糟糕。为什么你觉得这样更糟糕呢?在我看来,这似乎是一个完全有效的解决方案。 - Aplet123
@Aplet123 我不记得在其他代码库中看到过它的使用,所以我认为它并不是最优的选择。 - Bonanov
还要提一下,这个解决方案需要在将变量移入闭包之前,在代码中放置许多self.clone() - Bonanov
1
如果您不介意使用外部 crate(和宏),closure crate 旨在使克隆发送到闭包的变量更加容易。 - user4815162342
2个回答

5

目前唯一的方法是通过使用 Arc 使 self 持续很长时间。由于 run()UdpServer 上的一个方法,所以它需要将 Arc<Self> 进行更改,你考虑过但却拒绝了这种方式,因为它感觉更糟糕。不过,那就是实现的方法:

pub async fn run(self: Arc<Self>) {
    let socket = UdpSocket::bind(&self.addr).await.unwrap();
    loop {
        let mut buf: &mut [u8] = &mut [];
        let (_, _) = socket.recv_from(&mut buf).await.unwrap();

        tokio::spawn({
            let me = Arc::clone(&self);
            async move {
                let datagram = me.deserialize_datagram(buf).await;
                me.handle_datagram(datagram).await;
            }
        });
    }
}

Playground

有趣的是,smol异步运行时可能正是您要寻找的内容,因为它的执行者携带了一个生命周期。该生命周期与调用者环境中的值相关联,并且在执行者上产生的future可以引用它。例如,以下代码可以编译通过:

use futures_lite::future;
use smol::{Executor, net::UdpSocket};

struct Datagram;

struct UdpServer {
    addr: String,
}

impl UdpServer {
    pub async fn run<'a>(&'a self, ex: &Executor<'a>) {
        let socket = UdpSocket::bind(&self.addr).await.unwrap();
        loop {
            let mut buf: &mut [u8] = &mut [];
            let (_, _) = socket.recv_from(&mut buf).await.unwrap();

            ex.spawn({
                async move {
                    let datagram = self.deserialize_datagram(buf).await;
                    self.handle_datagram(datagram).await;
                }
            }).detach();
        }
    }

    pub async fn deserialize_datagram(&self, _buf: &mut [u8]) -> Datagram {
        unimplemented!()
    }

    pub async fn handle_datagram(&self, _datagram: Datagram) {
        unimplemented!()
    }
}

fn main() {
    let server = UdpServer { addr: "127.0.0.1:8080".to_string() };
    let ex = Executor::new();
    future::block_on(server.run(&ex));
}

1
在你的第一个例子中,我该如何使self可变?我尝试了self: &mut Arc<Self>但没有成功。 - SpaceMonkey
@SpaceMonkey 你可能只需要 (&mut self) - Peter Hansen

1
你说得对。在 Tokio 教程 中提到了这个解决方案:

如果一个数据需要被多个任务并发访问,那么就必须使用同步原语(例如 Arc)来共享它。


这并没有回答问题。一旦您拥有足够的声望,您将能够评论任何帖子;相反,提供不需要询问者澄清的答案。- 来自审核 - Tugrul Ates

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接