当我在使用Tokio和std::sync::Mutex时,为什么会出现死锁?

6

当使用Tokio时,我遇到了死锁条件:

use tokio::time::{delay_for, Duration};
use std::sync::Mutex;

#[tokio::main]
async fn main() {
    let mtx = Mutex::new(0);

    tokio::join!(work(&mtx), work(&mtx));

    println!("{}", *mtx.lock().unwrap());
}

async fn work(mtx: &Mutex<i32>) {
    println!("lock");
    {
        let mut v = mtx.lock().unwrap();
        println!("locked");
        // slow redis network request
        delay_for(Duration::from_millis(100)).await;
        *v += 1;
    }
    println!("unlock")
}

产生以下输出,然后永远挂起。
lock
locked
lock

根据Tokio文档,在异步代码中使用std::sync::Mutex是可以的。与普遍观点相反,通常情况下应使用标准库中的普通Mutex。
然而将Mutex替换为tokio::sync::Mutex不会触发死锁,并且一切正常“按预期”运行,但仅适用于上述示例情况。在真实场景中,如果延迟由某个Redis请求引起,则仍会失败。
我认为这可能是因为我根本没有生成线程,因此,即使是“并行”执行,由于await只是暂停执行,所以会在同一线程上锁定。
在不生成单独线程的情况下,如何以Rust语言方式实现我想要的效果?

你不能仅仅读取那一个句子。async mutex 相比于 blocking mutex 的特点是可以在 .await 点保持锁定状态,你的代码明显在锁定状态下调用了 .await 方法。 - Shepmaster
嗯,也许问题的措辞完全错误了。如果我使用tokio::sync::Mutex,上面的代码是否可以? - Konstantin W
1个回答

6
在这里使用 std::sync::Mutex不可行的 的原因是你在 .await 点上持有它。在这种情况下:
  • 任务1持有 Mutex,但在 delay_for 上被挂起。
  • 任务2被调度并运行,但无法获取 Mutex,因为它仍然由任务1拥有。它将同步阻塞在获取 Mutex 上。
由于任务2被阻塞,这也意味着运行时线程完全被阻塞。它实际上无法进入其计时器处理状态(当运行时处于空闲状态且不处理用户任务时会发生此情况),因此无法恢复任务1。
因此,你现在观察到了死锁。
==> 如果你需要在 .await 点上持有 Mutex,你必须使用异步 Mutex。像 tokio 文档所描述的那样,同步 Mutex 可以用于异步程序 - 但不能跨越 .await 点。

1
那么,在我的情况下使用tokio::sync::Mutex是正确的方法,对吗?在我的实际代码中,我正在等待获取流的项目,然后我正在等待tokio互斥锁。由于某种原因,tokio似乎会忘记一些等待互斥锁的未来(只要有多个未来在等待互斥锁???)。因此,这可能是一个错误,而不是我的错(我应该问这个问题...) - Konstantin W
如果您需要在.await点上持有互斥锁,任何异步互斥锁实现(例如tokio、futures-intrusive、async-std、futures-rs等)都应该能够满足此需求。也许您还可以重构程序以避免在.await点上持有互斥锁。关于“tokio忘记等待互斥锁”的问题,我不确定。听起来像是一个错误或用法错误。 - Matthias247
使用@KonstantinW或者tokio::spawn来运行你的“work”任务,这样它们就可以独立于父任务并被安排到不同的线程中。你目前的代码是单线程的,所有的内容都在同一个线程中运行,所以当lock()阻塞时,什么也不会运行了。 - stepan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接