Rust异步释放

21

我面临这样一种情况,需要在对象的drop处理程序中运行异步代码。整个应用程序在tokio异步上下文中运行,因此我知道drop处理程序是在活动的tokio Runtime下调用的,但不幸的是,drop本身是同步函数。

理想情况下,我希望找到一个既适用于多线程又适用于当前线程运行时的解决方案,但如果不存在这样的解决方案,那么我可以接受一个在dropping线程上阻塞并依赖其他线程驱动futures的解决方案。

我考虑了多个选项,但不确定哪种方法最好或者它们之间的权衡。对于这些示例,假设我的类有一个async terminate(&mut self)函数,我希望从drop()中调用它。

struct MyClass;
impl MyClass {
    async fn terminate(&mut self) {}
}

Option 1: tokio::runtime::Handle::block_on

impl Drop for MyClass {
    fn drop(&mut self) {
        tokio::runtime::Handle::current().block_on(self.terminate());
    }
}

这似乎是最简单的方法,但不幸的是它会导致恐慌。
Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.

请查看playground

我有些困惑,因为我以为Handle::block_on会使用当前正在运行的运行时,但它似乎尝试启动一个新的运行时?这是怎么回事?

此外,根据Handle::block_on的文档,它不能驱动IO线程。所以,阻塞这个线程是有风险的——如果太多的对象同时被销毁,每个对象都阻塞了一个线程,并且那些期待IO工作的futures,那么这将导致死锁。

选项2:futures::executor::block_on

impl Drop for MyClass {
    fn drop(&mut self) {
        futures::executor::block_on(self.terminate());
    }
}

请看playground
这种方法似乎运行良好。如果我理解正确,它会在当前线程上产生一个新的非tokio执行器,并使用该线程驱动未来。这会有问题吗?这会导致正在运行的tokio执行器和新的futures执行器之间发生冲突吗?
此外,这种方法是否可以实际驱动IO线程,从而避免选项1中的问题?或者可能会发生那些IO线程仍在等待tokio执行器的情况吗?
选项3:tokio::task::spawn with futures::executor::block_on
impl Drop for MyClass {
    fn drop(&mut self) {
        let task = tokio::task::spawn(self.terminate());
        futures::executor::block_on(task);
    }
}

请参见playground

这样做应该可以让tokio运行时驱动终止未来,而futures运行时仅阻塞当前线程等待tokio运行时完成。这比选项2更安全,并且在运行时之间引起的冲突更少吗?不幸的是,这遇到了一个我无法解决的生命周期问题。

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
   --> src/main.rs:8:44
    |
7   |     fn drop(&mut self) {
    |             --------- this data with an anonymous lifetime `'_`...
8   |         let task = tokio::task::spawn(self.terminate());
    |                                       ---- ^^^^^^^^^
    |                                       |
    |                                       ...is used here...
    |
note: ...and is required to live as long as `'static` here
   --> src/main.rs:8:20
    |
8   |         let task = tokio::task::spawn(self.terminate());
    |                    ^^^^^^^^^^^^^^^^^^
note: `'static` lifetime requirement introduced by this bound
   --> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:28
    |
127 |         T: Future + Send + 'static,
    |                            ^^^^^^^

我也试过使用LocalSet来解决这个问题,但无法使其工作。有什么办法可以让它起作用吗?

选项3b

然而,如果我让terminate()通过值来获取self并将MyClass包装成Wrapper,我就能够让它工作。虽然不是很美观,但可能比选项2更好,因为它使用了tokio运行时来驱动未来?

struct MyClass;
impl MyClass {
  async fn terminate(self) {}
}

struct Wrapper(Option<MyClass>);

impl Drop for Wrapper {
    fn drop(&mut self) {
        if let Some(v) = self.0.take() {
            let task = tokio::task::spawn(v.terminate());
            futures::executor::block_on(task).unwrap();
        }
    }
}

请参见playground

这是一个好的方案吗?Tokio运行时是否真的驱动放弃的future,还是更简单的Option 2更好?有没有办法使option 3b更美观/易于使用?

选项4:后台任务

我在https://stackoverflow.com/a/68851788/829568上找到了这个选项。它基本上在对象的构造函数中生成一个后台任务,等待触发并在触发时运行异步丢弃代码。然后,放弃实现将其触发并运行忙等待循环,直到完成为止。

这似乎过于复杂,也比这里的其他选项更容易出错。或者这实际上是最佳解决方案吗?

关于耗尽工作线程的附加问题

除了选项1外,所有这些选项都会阻塞tokio工作线程以等待异步丢失完成。在多线程运行时中,大多数情况下都会很好,但理论上,如果多个析构函数并行运行,它们可能会锁定所有工作线程 - 如果我理解正确,则我们将陷入死锁状态,没有线程会取得进展。选项1似乎有些更好,但block_on文档说它只能驱动非IO future。因此,如果太多的析构函数执行IO工作,它仍然可能会被锁定。有没有办法告诉tokio增加一个工作线程的数量?如果我们对每个线程都阻塞增加一个线程,是否可以避免这个问题?

选项5:在新线程中创建新运行时

impl Drop for MyClass {
    fn drop(&mut self) {
        std::thread::scope(|s| {
            s.spawn(|| {
                let runtime = tokio::runtime::Builder::new_multi_thread()
                    .build()
                    .unwrap();
                runtime.block_on(self.terminate());
            });
        });
    }
}

请看playground
这段代码似乎工作正常,并试图避免阻塞工作线程,通过在新线程上的新运行时中运行drop任务。这个新线程应该能够驱动IO任务。但是,这是否真正完全解决了问题?如果drop任务依赖于正在主tokio执行器上运行的IO任务会怎么样呢?我认为这仍有可能导致程序无限期地锁定。

异步丢弃是一个未解决的问题。我不知道你是否可以从一个运行时实例中获取tokio资产并将它们发送到另一个运行时实例,但我怀疑不能。我认为你应该尝试使用一个不那么琐碎的示例,例如在terminate中放置一个tokio计时器调用。这将更好地演示什么能够工作和不能工作。 - PitaJ
例如,我认为 futures::block_on 执行器与 tokio API 不兼容。 - PitaJ
2个回答

4

选项1:tokio::runtime::Handle::block_on

block_on函数是tokio运行时的入口点;例如,当您使用#[tokio::main]进行注释时,它就会运行。如果这样做可以工作,tokio将会生成一个全新的运行时,并阻塞当前线程直到其完成。您绝对不希望发生这种情况!

选项2:futures::executor::block_on

这个方法可以工作,但是会阻塞,因此不是理想的选择,因为在它完成之前,该线程上的其他任务无法取得进展。

选项3:tokio::task::spawnfutures::executor::block_on

在这里您不需要使用block_on;生成一个任务将使该任务运行到完成。不需要阻塞任何线程!这就是我会做的事情。但是,您注意到了一个问题,如果编译器允许这样做,那么它将导致内存错误。让我们假装我们可以这样做:

  1. 我们有一个名为 foo: MyClass 的变量。
  2. foo 被丢弃了。
  3. 我们生成了一个任务,并将其引用传递给 foo.terminate() 方法。
  4. foo 不再存在,但是我们有一个引用后台任务!最好的情况是 seg-fault。

那么我们该如何避免这种情况呢?这就引出了 Option 3b。

Option 3b

我认为这是一个很好的解决方案(再次强调,不使用 block_on)。

如果 MyClass 有一个廉价的 default() 实现,那么你就不需要包装器,并且可以将其替换为默认值。我的第一个想法是调用 std::mem::take,它会留下一个默认值,但是这会遇到一个问题;在调用 drop 方法时会导致堆栈溢出。所以,我们可以使用一个标志来指示已经被删除:

#[derive(Default)]
struct MyClass {
    dropped: bool,
}

impl MyClass {
    async fn terminate(&mut self) {
        println!("Terminating");
    }
}

impl Drop for MyClass {
    fn drop(&mut self) {
        if !self.dropped {
            let mut this = MyClass::default();
            std::mem::swap(&mut this, self);
            this.dropped = true;
            tokio::spawn(async move { this.terminate().await });
        }
    }
}

如果你发现自己经常需要使用这个,你可以创建一个Dropper包装器来与各种类型一起使用:

#[async_trait::async_trait]
pub trait AsyncDrop {
    async fn async_drop(&mut self);
}

#[derive(Default)]
pub struct Dropper<T: AsyncDrop + Default + Send + 'static> {
    dropped: bool,
    inner: T,
}

impl<T: AsyncDrop + Default + Send + 'static> Dropper<T> {
    pub fn new(inner: T) -> Self {
        Self {
            dropped: false,
            inner,
        }
    }
}

impl<T: AsyncDrop + Default + Send + 'static> Drop for Dropper<T> {
    fn drop(&mut self) {
        if !self.dropped {
            let mut this = Dropper::default();
            std::mem::swap(&mut this, self);
            this.dropped = true;

            tokio::spawn(async move {
                this.inner.async_drop().await;
            });
        }
    }
}

选项4:后台任务

这已经被另一个答案涵盖了:https://dev59.com/POk5XIcBkEYKwwoY2NHV#71741467

选项5:在新线程中创建新的运行时

我绝对不会每次想要退出时都生成一个新的运行时;那样非常笨重。

通过使用作用域线程,您也无法解决阻塞问题。线程将在作用域结束时加入,这是立即的,并阻塞直到运行时完成。


关于选项3,似乎有一个替代方案可以为结构体派生Clone,并在drop中克隆其数据以发送到新线程。(适用于某些情况) - Venryx
为什么它不能保证滴液的顺序?虽然它最终会按预期滴下,但如果我有多个滴液,它不会按照相反的顺序滴下。总是不同的。 - undefined
为什么不能保证滴落的顺序?如果你在谈论Option 3b,我们会生成一个任务来执行滴落操作。任务是并行执行的,无法保证执行顺序。如果你需要保证顺序,可以使用一个通道和一个长期存在的后台任务,从通道中读取并按顺序进行滴落操作。 - undefined

1
如果你想要“做某事”,但是不想独占 MyClass 的可变访问权限,也许使用 oneshot 通道来触发异步计算会起到作用?这与选项 #4 类似。

你也可以通过通道发送一些额外的状态。

use std::time::Duration;

use tokio::{
    runtime::Runtime,
    sync::oneshot::{self, Receiver, Sender},
    time::interval,
};

struct MyClass {
    tx: Option<Sender<()>>, // can have SomeStruct instead of () 
    // my_state: Option<SomeStruct>
}

impl MyClass {
    pub async fn new() -> Self {
        println!("MyClass::new()");

        let (tx, mut rx) = oneshot::channel();

        tokio::task::spawn(async move {
            let mut interval = interval(Duration::from_millis(100));

            println!("drop wait loop starting...");

            loop {
                tokio::select! {
                    _ = interval.tick() => println!("Another 100ms"),
                    msg = &mut rx => {
                        println!("should process drop here");
                        break;
                    }
                }
            }
        });

        Self { tx: Some(tx) }
    }
}

impl Drop for MyClass {
    fn drop(&mut self) {
        println!("drop()");
        self.tx.take().unwrap().send(()).unwrap();
        // self.tx.take().unwrap().send(self.my_state.take().unwrap()).unwrap(); 
    }
}

#[tokio::main]
async fn main() {
    let class = MyClass::new().await;
}

这通常会打印出:

MyClass::new()
drop()
drop wait loop starting...
should process drop here

有时候,在接收方任务有机会生成之前,进程就已经存在了。但是如果你有一个非退出代码,应该没问题。
不确定 select! interval.tick 是否必要,不过遗憾的是,单次触发通道没有异步阻塞接收方法。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接