我面临这样一种情况,需要在对象的drop处理程序中运行异步代码。整个应用程序在tokio异步上下文中运行,因此我知道drop处理程序是在活动的tokio Runtime下调用的,但不幸的是,drop本身是同步函数。
理想情况下,我希望找到一个既适用于多线程又适用于当前线程运行时的解决方案,但如果不存在这样的解决方案,那么我可以接受一个在dropping线程上阻塞并依赖其他线程驱动futures的解决方案。
我考虑了多个选项,但不确定哪种方法最好或者它们之间的权衡。对于这些示例,假设我的类有一个async terminate(&mut self)
函数,我希望从drop()
中调用它。
struct MyClass;
impl MyClass {
async fn terminate(&mut self) {}
}
Option 1: tokio::runtime::Handle::block_on
impl Drop for MyClass {
fn drop(&mut self) {
tokio::runtime::Handle::current().block_on(self.terminate());
}
}
这似乎是最简单的方法,但不幸的是它会导致恐慌。
Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
请查看playground
我有些困惑,因为我以为Handle::block_on
会使用当前正在运行的运行时,但它似乎尝试启动一个新的运行时?这是怎么回事?
此外,根据Handle::block_on
的文档,它不能驱动IO线程。所以,阻塞这个线程是有风险的——如果太多的对象同时被销毁,每个对象都阻塞了一个线程,并且那些期待IO工作的futures,那么这将导致死锁。
选项2:futures::executor::block_on
impl Drop for MyClass {
fn drop(&mut self) {
futures::executor::block_on(self.terminate());
}
}
请看playground。
这种方法似乎运行良好。如果我理解正确,它会在当前线程上产生一个新的非tokio执行器,并使用该线程驱动未来。这会有问题吗?这会导致正在运行的tokio执行器和新的futures执行器之间发生冲突吗?
此外,这种方法是否可以实际驱动IO线程,从而避免选项1中的问题?或者可能会发生那些IO线程仍在等待tokio执行器的情况吗?
选项3:
tokio::task::spawn
with futures::executor::block_on
impl Drop for MyClass {
fn drop(&mut self) {
let task = tokio::task::spawn(self.terminate());
futures::executor::block_on(task);
}
}
请参见playground
这样做应该可以让tokio运行时驱动终止未来,而futures运行时仅阻塞当前线程等待tokio运行时完成。这比选项2更安全,并且在运行时之间引起的冲突更少吗?不幸的是,这遇到了一个我无法解决的生命周期问题。
error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:8:44
|
7 | fn drop(&mut self) {
| --------- this data with an anonymous lifetime `'_`...
8 | let task = tokio::task::spawn(self.terminate());
| ---- ^^^^^^^^^
| |
| ...is used here...
|
note: ...and is required to live as long as `'static` here
--> src/main.rs:8:20
|
8 | let task = tokio::task::spawn(self.terminate());
| ^^^^^^^^^^^^^^^^^^
note: `'static` lifetime requirement introduced by this bound
--> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:28
|
127 | T: Future + Send + 'static,
| ^^^^^^^
我也试过使用LocalSet
来解决这个问题,但无法使其工作。有什么办法可以让它起作用吗?
选项3b
然而,如果我让terminate()
通过值来获取self
并将MyClass
包装成Wrapper
,我就能够让它工作。虽然不是很美观,但可能比选项2更好,因为它使用了tokio运行时来驱动未来?
struct MyClass;
impl MyClass {
async fn terminate(self) {}
}
struct Wrapper(Option<MyClass>);
impl Drop for Wrapper {
fn drop(&mut self) {
if let Some(v) = self.0.take() {
let task = tokio::task::spawn(v.terminate());
futures::executor::block_on(task).unwrap();
}
}
}
请参见playground
这是一个好的方案吗?Tokio运行时是否真的驱动放弃的future,还是更简单的Option 2更好?有没有办法使option 3b更美观/易于使用?
选项4:后台任务
我在https://stackoverflow.com/a/68851788/829568上找到了这个选项。它基本上在对象的构造函数中生成一个后台任务,等待触发并在触发时运行异步丢弃代码。然后,放弃实现将其触发并运行忙等待循环,直到完成为止。
这似乎过于复杂,也比这里的其他选项更容易出错。或者这实际上是最佳解决方案吗?
关于耗尽工作线程的附加问题
除了选项1外,所有这些选项都会阻塞tokio工作线程以等待异步丢失完成。在多线程运行时中,大多数情况下都会很好,但理论上,如果多个析构函数并行运行,它们可能会锁定所有工作线程 - 如果我理解正确,则我们将陷入死锁状态,没有线程会取得进展。选项1似乎有些更好,但block_on
文档说它只能驱动非IO future。因此,如果太多的析构函数执行IO工作,它仍然可能会被锁定。有没有办法告诉tokio增加一个工作线程的数量?如果我们对每个线程都阻塞增加一个线程,是否可以避免这个问题?
选项5:在新线程中创建新运行时
impl Drop for MyClass {
fn drop(&mut self) {
std::thread::scope(|s| {
s.spawn(|| {
let runtime = tokio::runtime::Builder::new_multi_thread()
.build()
.unwrap();
runtime.block_on(self.terminate());
});
});
}
}
请看playground。
这段代码似乎工作正常,并试图避免阻塞工作线程,通过在新线程上的新运行时中运行drop任务。这个新线程应该能够驱动IO任务。但是,这是否真正完全解决了问题?如果drop任务依赖于正在主tokio执行器上运行的IO任务会怎么样呢?我认为这仍有可能导致程序无限期地锁定。
tokio
资产并将它们发送到另一个运行时实例,但我怀疑不能。我认为你应该尝试使用一个不那么琐碎的示例,例如在terminate
中放置一个tokio
计时器调用。这将更好地演示什么能够工作和不能工作。 - PitaJfutures::block_on
执行器与tokio
API 不兼容。 - PitaJ