Rust的互斥锁为什么似乎不会将锁分配给最后想要获得锁的线程?

10

我想编写一个程序,它会生成两个线程,锁定一个Mutex,增加它的值,打印一些内容,然后解锁Mutex,以便另一个线程可以执行相同操作。我添加了一些休眠时间使其更加一致,所以我认为输出应该是这样的:

ping pong ping pong …  

但实际输出相当随机。大多数情况下,它只是

ping ping ping … pong 

但是完全没有一致性;有时候中间也会出现“pong”。

我曾经认为互斥锁有某种确定最后想要锁定它的人的方式,但看起来并非如此。

  1. 锁定实际上是如何工作的?
  2. 如何获得所需的输出?
use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let data1 = Arc::new(Mutex::new(1));
    let data2 = data1.clone();
    let ten_millis = time::Duration::from_millis(10);

    let a = thread::spawn(move || loop {
        let mut data = data1.lock().unwrap();
        thread::sleep(ten_millis);
        println!("ping ");
        *data += 1;
        if *data > 10 {
            break;
        }
    });

    let b = thread::spawn(move || loop {
        let mut data = data2.lock().unwrap();
        thread::sleep(ten_millis);
        println!("pong ");
        *data += 1;
        if *data > 10 {
            break;
        }
    });

    a.join().unwrap();
    b.join().unwrap();
}
3个回答

12

MutexRwLock都使用操作系统特定的原语,并不能保证公平性。在Windows上,它们都是使用SRW locks实现的,这些锁被明确记录为公平。我没有为其他操作系统做过研究,但你绝对不能指望std::sync::Mutex具有公平性,特别是如果你需要这段代码可移植。

Rust中的一个可能的解决方案是由parking_lot crate提供的Mutex实现,它提供了一个unlock_fair方法,该方法采用公平算法实现。

来自parking_lot documentation:

默认情况下,互斥锁是不公平的,并允许当前线程在其他线程有机会获取锁之前重新锁定互斥锁,即使该线程已经长时间地被阻塞在互斥锁上。这是默认设置,因为它可以实现更高的吞吐量,避免在每次互斥解锁时强制进行上下文切换。这可能导致一个线程比其他线程更多次地获取互斥锁。
然而,在某些情况下,如果有等待的线程,强制将锁传递给等待的线程可能是有益的。这可以通过使用此方法而不是正常放弃MutexGuard来完成。
虽然parking_lot::Mutex没有声明在不特别使用unlock_fair方法的情况下是公平的,但我发现只需进行playground中的转换(甚至不使用unlock_fair方法),您的代码产生了与乒乓球相同数量的ping和pong。
通常,当guard超出作用域时,互斥锁会自动解锁。要使其公平解锁,需要在guard被删除之前插入此方法调用:
let b = thread::spawn(move || loop {
    let mut data = data1.lock();
    thread::sleep(ten_millis);
    println!("pong ");
    *data += 1;
    if *data > 10 {
        break;
    }
    MutexGuard::unlock_fair(data);
});

6
互斥锁的加锁顺序是不被保证的。第一个线程有可能100%获得锁,第二个线程却0%获得锁。
线程的调度由操作系统控制,以下情况是完全可能的:
1. 操作系统给予第一个线程 CPU 时间,它获取到锁。 2. 操作系统给予第二个线程 CPU 时间,但锁已被占用,因此它进入睡眠状态。 3. 第一个线程释放了锁,但仍被操作系统允许运行。它继续循环并重新获取锁。 4. 因为锁仍被占用,另一个线程无法继续执行。
如果你给第二个线程更多的时间来获取锁,就会看到预期的“乒乓”模式,但这并不是保证的(糟糕的操作系统可能决定永远不给某些线程 CPU 时间)。
use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let data1 = Arc::new(Mutex::new(1));
    let data2 = data1.clone();

    let ten_millis = time::Duration::from_millis(10);

    let a = thread::spawn(move || loop {
        let mut data = data1.lock().unwrap();
        *data += 1;
        if *data > 10 {
            break;
        }

        drop(data);
        thread::sleep(ten_millis);
        println!("ping ");
    });

    let b = thread::spawn(move || loop {
        let mut data = data2.lock().unwrap();
        *data += 1;
        if *data > 10 {
            break;
        }

        drop(data);
        thread::sleep(ten_millis);
        println!("pong ");
    });

    a.join().unwrap();
    b.join().unwrap();
}

你可以通过调整睡眠时间来验证这一点。睡眠时间越短,乒乓交替就会更加不规则,当时间低至10毫秒时,你可能会看到ping-ping-pong等结果。
基于时间的解决方案本质上是设计不良的。你可以通过改进算法来保证“ping”后面跟着“pong”。例如,你可以在奇数上打印“ping”,在偶数上打印“pong”。
use std::sync::{Arc, Mutex};
use std::{thread, time};

const MAX_ITER: i32 = 10;

fn main() {
    let data1 = Arc::new(Mutex::new(1));
    let data2 = data1.clone();

    let ten_millis = time::Duration::from_millis(10);

    let a = thread::spawn(move || 'outer: loop {
        loop {
            thread::sleep(ten_millis);
            let mut data = data1.lock().unwrap();
            if *data > MAX_ITER {
                break 'outer;
            }

            if *data & 1 == 1 {
                *data += 1;
                println!("ping ");
                break;
            }
        }
    });

    let b = thread::spawn(move || 'outer: loop {
        loop {
            thread::sleep(ten_millis);
            let mut data = data2.lock().unwrap();
            if *data > MAX_ITER {
                break 'outer;
            }

            if *data & 1 == 0 {
                *data += 1;
                println!("pong ");
                break;
            }
        }
    });

    a.join().unwrap();
    b.join().unwrap();
}

这并不是最好的实现方式,但我试图在尽可能少地修改原始代码的情况下进行。

你也可以考虑使用 Condvar 进行实现,这是一种更好的解决方案,在我的看法中,它避免了在互斥锁上进行忙等待的问题(备注:也删除了代码重复):

use std::sync::{Arc, Mutex, Condvar};
use std::thread;

const MAX_ITER: i32 = 10;

fn main() {
    let cv1 = Arc::new((Condvar::new(), Mutex::new(1)));
    let cv2 = cv1.clone();

    let a = thread::spawn(ping_pong_task("ping", cv1, |x| x & 1 == 1));
    let b = thread::spawn(ping_pong_task("pong", cv2, |x| x & 1 == 0));

    a.join().unwrap();
    b.join().unwrap();
}

fn ping_pong_task<S: Into<String>>(
        msg: S, 
        cv: Arc<(Condvar, Mutex<i32>)>, 
        check: impl Fn(i32) -> bool) -> impl Fn() 
{
    let message = msg.into();

    move || {
        let (condvar, mutex) = &*cv;

        let mut value = mutex.lock().unwrap();
        loop {
            if check(*value) {
                println!("{} ", message);
                *value += 1;
                condvar.notify_all();
            }

            if *value > MAX_ITER {
                break;
            }

            value = condvar.wait(value).unwrap();
        }
    }
}

4

我曾认为互斥锁有某种方式来确定谁最后想要锁定它,但看起来似乎并非如此。

不是这样的。互斥锁的作用只是尽可能快地运行代码。交替锁定会导致最差的性能,因为你不断地清空CPU高速缓存。你要求的是最糟糕的互斥锁实现。

锁定实际上是如何工作的?

调度程序尽可能多地完成工作。你的工作是编写仅执行你真正想完成的工作的代码。

如何获得所需的输出?

如果你只想做一件事,然后做一些其他的事情,然后再做第一件事,不要使用两个线程。当你不关心工作完成的顺序,只想尽可能多地完成工作时,使用线程。


2
不要使用两个线程,如果你只是想做一件事情,然后做另一件事情,再回到第一件事情。换句话说,当工作可以并行处理时,使用线程。如果使用互斥锁将工作序列化,则使用线程没有意义,这样的性能比在一个线程上完成所有工作还要差。 - Matthieu M.
2
我非常确定这只是一个练习(或作业),旨在熟悉并发原语,因此在这种情况下,“不要使用线程”不是一个恰当的答案。 - Svetlin Zarev
1
@SvetlinZarev 如果这是一项练习,那么在一个人对线程的理解还不够深入时就进行这样的练习是不好的,因为它没有充分利用线程的实际用途。我见过一些不幸的情况,人们在理解如何正确使用线程之前就被分配了这样的任务,我认为指出这个练习是不现实的,并且往往会导致学习错误的东西仍然非常有帮助。 - David Schwartz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接