在异步移动块中使用 `&mut self`

3

我有一个WriterJob结构体。它的功能并不重要;它所做的是生成一个长时间运行的任务,并提供调用者一个API来检查作业的状态。

pub(crate) struct WriterJob {
    ...,
    status: Status,
}

impl WriterJob {
    async fn writer_job_impl(&mut self, rx: Receiver<WriteCommand>) {
       // Job implementation details. It's important that `self` is mutuable
       // since I update the job status
       // eg.
       // self.status = Status::Ready;
    }

    pub(crate) fn status(&self) -> Status {
       self.status
    }

    pub(crate) fn spawn_writer_job(&mut self) -> Sender<WriteCommand> {
        let (tx, rx) = mpsc::channel(10);

        let handle = tokio::spawn(async move {
            self.writer_job_impl(rx).await;
        });

        self.status = Status::Spawned;

        tx
    }

我遇到了这个错误:
error[E0521]: borrowed data escapes outside of associated function
  --> src/io/buffered_write.rs:92:22
   |
89 |       pub(crate) fn spawn_writer_job(&mut self) -> Sender<WriteCommand> {
   |                                      ---------
   |                                      |
   |                                      `self` is a reference that is only valid in the associated function body
   |                                      let's call the lifetime of this reference `'1`
...
92 |           let handle = tokio::spawn(async move {
   |  ______________________^
93 | |             self.writer_job_impl(rx).await;
94 | |         });
   | |          ^
   | |          |
   | |__________`self` escapes the associated function body here
   |            argument requires that `'1` must outlive `'static`

我认为编译器抱怨它不知道 self 是否会像生成的任务一样长寿,因此出现了生命周期错误。但我不确定如何解决这个问题。一种可能的方法是使用 Arc<Mutex<Status>>Arc<RwLock<Status>>,但我不喜欢这种方法,因为我可能需要在 self 中添加更多可变字段。有更简洁的方法吗?


1
还要考虑一个问题,编译器如何确保在生成的任务运行时不会再次借用 self?它并不知道任务将运行多长时间,因此使其正常工作的唯一方法是通过 移动 self 或使用内部可变性。 - Peter Hall
根据您的需求,您也可以使用[scoped tasks](https://docs.rs/tokio-scoped/latest/tokio_scoped/)。 - Peter Hall
在已经设置的struct上使用pub(crate)可能有些多余。 - tadman
2
status 提取到一个单独的状态结构体中,并用 Arc<Mutex> 包装它。你想要从不同的线程读写它,所以必须以某种方式进行同步。如果你不想使用互斥锁,可以切换到原子操作或原子枚举。 - xamgore
2个回答

1

你不能假设在执行handle期间,self不会被释放,这就是为什么会抛出错误。相反,你可以改变你的结构设计来使其正常工作。

  1. 创建一个包含你需要在线程之间共享的数据的子结构体
struct WriteJobInner {
    ...,
    status: Status,
}

创建用于对着色器数据执行操作的结构体。
// you can use 'std::sync::Mutex' if you need to touch the data in non-async
// functions
use tokio::sync::Mutex;
use std::sync::Arc;

#[derive(Clone)]
pub(crate) struct Writejob {
    inner: Arc<Mutex<WriteJobInner>>,
}

另外,你可能可以用 tokio::sync::RwLock 替换 tokio 的互斥锁,如果有多个线程在数据上执行不可变操作,则该锁更好,让许多线程读取数据而不阻塞其他只读线程。

  1. 最后一步是实现 WriteJob 函数,你可以使用 &self 而不是 &mut self ,因为你使用互斥锁保护修改数据。
impl WriterJob {
    async fn writer_job_impl(&self, rx: Receiver<WriteCommand>) {
       // Job implementation details. It's important that `self` is mutuable
       // since I update the job status
       // eg.
       // *self.inner.lock().await.status = Status::Ready;
    }

    pub(crate) async fn status(&self) -> Status {
       // make sure the `status` implements copy trait
       *self.inner.lock().await.status
    }

    pub(crate) fn spawn_writer_job(&self) -> Sender<WriteCommand> {
        let (tx, rx) = mpsc::channel(10);
        // creates a new instance `Self` but all the instances of `self.inner`
        // references to the same shared state because it's wrapped by the
        // shader pointer `Arc`
        let writer = self.clone();

        let handle = tokio::spawn(async move {
            writer.writer_job_impl(rx).await;
        });

        *self.inner.status.lock().await = Status::Spawned;

        tx
    }
}

0
一种可能的解决方案是使用内部可变性。通过将Status字段更改为Cell<Status> - 或原子类型,例如crossbeam的crossbeam::atomic::AtomicCell - 您可以删除mut和生命周期限制。
假设您将status字段移动到内部类型中;我称之为WriterJobInner。现在,WriterJob仅拥有一个Arc<WriterJobInner>
pub(crate) struct WriterJob {
    inner: std::sync::Arc<WriterJobInner>,
}

struct WriterJobInner {
    pub status: std::cell::Cell<Status>,
}

struct WriteCommand;

#[derive(Copy, Clone)]
enum Status {
    None,
    Spawned,
    Ready,
}

如果你想的话,你可以为WriterJob实现Deref以简化一些访问。

你的WriterJob实现现在将更改为:

impl WriterJob {
    pub(crate) fn spawn_writer_job(&self) -> tokio::sync::mpsc::Sender<WriteCommand> {
        let (tx, rx) = tokio::sync::mpsc::channel(10);

        // Clone the Arc and move it to the background thread.
        let inner = self.inner.clone();
        let handle = tokio::spawn(async move {
            inner.writer_job_impl(rx).await;
        });

        // TODO: Set only to Spawned if still None
        self.inner.status.set(Status::Spawned);
        tx
    }

    pub(crate) fn status(&self) -> Status {
        self.inner.status()
    }
}

由于线程不需要传递self - 无论是可变的还是不可变的 - 错误就消失了。此外,由于status现在是一个Cell,它也不需要mut self

同样,您的WriterJobInner也只需要&self

impl WriterJobInner {
    pub async fn writer_job_impl(&self, rx: tokio::sync::mpsc::Receiver<WriteCommand>) {
        // ...
        self.status.set(Status::Ready);
    }

    pub fn status(&self) -> Status {
        self.status.get()
    }
}

unsafe impl Send for WriterJobInner {}
unsafe impl Sync for WriterJobInner {}

不足的是,WriterJobInner 类型需要同时满足 SendSync 才能与 Arc<T> 一起使用,但你仍然在跨线程中使用它。

请注意,在创建线程后将 status 设置为 Spawned 是一种竞态条件。您可以尝试仅在其未被设置为其他值时原子性地设置该值。


在这种方法中,最有用的混合方式可能是将需要一起更改的所有内容捆绑在一起,然后对其使用外部可变性。

在此之上使用RwLock(或类似物),例如作为inner: Arc<RwLock<..>>,需要您将同步代码与异步代码混合使用(在async方法中使用std::sync::RwLock)或使您的访问器成为async(当使用tokio::sync::RwLock时)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接