Rust By Example中的通道是如何工作的?

7

我对于《Rust by Example》中的通道章节的输出感到相当困惑:

use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;

static NTHREADS: i32 = 3;

fn main() {
    // Channels have two endpoints: the `Sender<T>` and the `Receiver<T>`,
    // where `T` is the type of the message to be transferred
    // (type annotation is superfluous)
    let (tx, rx): (Sender<i32>, Receiver<i32>) = mpsc::channel();

    for id in 0..NTHREADS {
        // The sender endpoint can be copied
        let thread_tx = tx.clone();

        // Each thread will send its id via the channel
        thread::spawn(move || {
            // The thread takes ownership over `thread_tx`
            // Each thread queues a message in the channel
            thread_tx.send(id).unwrap();

            // Sending is a non-blocking operation, the thread will continue
            // immediately after sending its message
            println!("thread {} finished", id);
        });
    }

    // Here, all the messages are collected
    let mut ids = Vec::with_capacity(NTHREADS as usize);
    for _ in 0..NTHREADS {
        // The `recv` method picks a message from the channel
        // `recv` will block the current thread if there no messages available
        ids.push(rx.recv());
    }

    // Show the order in which the messages were sent
    println!("{:?}", ids);
}

使用默认的NTHREADS = 3,我得到了以下输出:

thread 2 finished
thread 1 finished
[Ok(2), Ok(1), Ok(0)]

for循环中的println!("thread {} finished", id);为什么会以相反的顺序打印?thread 0 finished去哪了?

当我将NTHREADS = 8更改后,发生了更神秘的事情:

thread 6 finished
thread 7 finished
thread 8 finished
thread 9 finished
thread 5 finished
thread 4 finished
thread 3 finished
thread 2 finished
thread 1 finished
[Ok(6), Ok(7), Ok(8), Ok(9), Ok(5), Ok(4), Ok(3), Ok(2), Ok(1), Ok(0)]

打印顺序让我更加困惑,而且线程0总是缺失的。如何解释这个例子?

我在不同的电脑上尝试了这个例子,结果都一样。


NTHREADS = 8,我看到 线程9已完成。你有什么想法吗?显然代码无法处理简单的计数,这似乎是一个更严重的问题。 - Shepmaster
1
你是否使用了NTHREADS=10而不是8来设置输出? - Chris Emerson
1个回答

11

线程之间没有保证的顺序或协调,因此它们会以任意顺序执行并将结果发送到通道中。这正是重点 - 如果它们是独立的,则可以使用多个线程。

主线程从通道中获取 N 值,将它们放入 Vec 中,打印出 Vec 并退出。

主线程在退出之前不会等待子线程完成。最后一个线程将值发送到通道中,并由主线程读取它(结束 for 循环),然后程序退出。线程不会有机会打印其已完成。

还可能存在最后一个线程在主线程恢复并退出之前有机会运行和打印输出的情况。

每种情况可能更或少可能发生,具体取决于 CPU 数量或操作系统,但两者都是程序的正确运行方式。

修改后等待线程的代码版本显示不同的输出:

use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;

static NTHREADS: i32 = 3;

fn main() {
    // Channels have two endpoints: the `Sender<T>` and the `Receiver<T>`,
    // where `T` is the type of the message to be transferred
    // (type annotation is superfluous)
    let (tx, rx): (Sender<i32>, Receiver<i32>) = mpsc::channel();

    let handles: Vec<_> = (0..NTHREADS).map(|id| {
        // The sender endpoint can be copied
        let thread_tx = tx.clone();

        // Each thread will send its id via the channel
        thread::spawn(move || {
            // The thread takes ownership over `thread_tx`
            // Each thread queues a message in the channel
            thread_tx.send(id).unwrap();

            // Sending is a non-blocking operation, the thread will continue
            // immediately after sending its message
            println!("thread {} finished", id);
        })
    }).collect();

    // Here, all the messages are collected
    let mut ids = Vec::with_capacity(NTHREADS as usize);
    for _ in 0..NTHREADS {
        // The `recv` method picks a message from the channel
        // `recv` will block the current thread if there no messages available
        ids.push(rx.recv());
    }

    // Show the order in which the messages were sent
    println!("{:?}", ids);

    // Wait for threads to complete
    for handle in handles {
        handle.join().expect("Unable to join");
    }
}

请注意,在这种情况下,主线程在最后一个线程退出之前打印输出:
thread 2 finished
thread 1 finished
[Ok(2), Ok(1), Ok(0)]
thread 0 finished

这四行代码的顺序无关紧要:子线程打印在主线程之前或之后都没有问题,因此这些代码可以以任何顺序出现。

1
谢谢,这非常有帮助! - asdetrefle

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接