如何使用mpsc通道在线程之间创建环形通信?

3
我希望能够创建 n 个线程,并且这些线程之间可以以环形拓扑结构进行通讯,例如,线程 0 可以向线程 1 发送消息,线程 1 可以向线程 2 发送消息,以此类推,而线程 n 可以向线程 0 发送消息。
以下是当 n=3 时我想要实现的示例:
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;

let (tx0, rx0): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx1, rx1): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx2, rx2): (Sender<i32>, Receiver<i32>) = mpsc::channel();

let child0 = thread::spawn(move || {
    tx0.send(0).unwrap();
    println!("thread 0 sent: 0");
    println!("thread 0 recv: {:?}", rx2.recv().unwrap());
});
let child1 = thread::spawn(move || {
    tx1.send(1).unwrap();
    println!("thread 1 sent: 1");
    println!("thread 1 recv: {:?}", rx0.recv().unwrap());
});
let child2 = thread::spawn(move || {
    tx2.send(2).unwrap();
    println!("thread 2 sent: 2");
    println!("thread 2 recv: {:?}", rx1.recv().unwrap());
});

child0.join();
child1.join();
child2.join();

我在循环中创建通道,将它们存储在向量中,重新排序发送方,并将其存储在新向量中,然后为每个发送者和接收者(tx1/rx0, tx2/rx1等)对都生成一个自己的线程。

const NTHREADS: usize = 8;

// create n channels
let channels: Vec<(Sender<i32>, Receiver<i32>)> =
    (0..NTHREADS).into_iter().map(|_| mpsc::channel()).collect();

// switch tupel entries for the senders to create ring topology
let mut channels_ring: Vec<(Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
    .into_iter()
    .map(|i| {
        (
            channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
            channels[i].1,
        )
    })
    .collect();

let mut children = Vec::new();
for i in 0..NTHREADS {
    let (tx, rx) = channels_ring.remove(i);

    let child = thread::spawn(move || {
        tx.send(i).unwrap();
        println!("thread {} sent: {}", i, i);
        println!("thread {} recv: {:?}", i, rx.recv().unwrap());
    });

    children.push(child);
}

for child in children {
    let _ = child.join();
}

这种方法行不通,因为无法复制 Sender 以创建一个新的向量。 但是,如果我使用引用(& Sender):

let mut channels_ring: Vec<(&Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
    .into_iter()
    .map(|i| {
        (
            &channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
            channels[i].1,
        )
    })
    .collect();

我无法启动线程,因为std::sync::mpsc::Sender<i32>不能在线程之间安全地共享。

2个回答

6
SenderReceiver 不能共享,因此您需要将它们移动到各自的线程中。这意味着从 Vec 中删除它们或在迭代时消费 Vec - 即使作为中间步骤,也不允许该向量处于无效状态(有空洞)。使用 into_iter 遍历向量将通过消耗它们来实现。
一个小技巧是,创建两个向量,一个用于发送者,另一个用于接收者,并旋转其中一个向量,以便每个向量的相同索引将给出所需的配对结果。
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;

fn main() {
    const NTHREADS: usize = 8;

    // create n channels
    let (mut senders, receivers): (Vec<Sender<i32>>, Vec<Receiver<i32>>) =
        (0..NTHREADS).into_iter().map(|_| mpsc::channel()).unzip();

    // move the first sender to the back
    senders.rotate_left(1);

    let children: Vec<_> = senders
        .into_iter()
        .zip(receivers.into_iter())
        .enumerate()
        .map(|(i, (tx, rx))| {
            thread::spawn(move || {
                tx.send(i as i32).unwrap();
                println!("thread {} sent: {}", i, i);
                println!("thread {} recv: {:?}", i, rx.recv().unwrap());
            })
        })
        .collect();

    for child in children {
        let _ = child.join();
    }
}

很好的答案。您可能还想像我所做的那样使用collect构建children - user4815162342
是的,那样会更好。不过我还是会保留它,而不是对原始代码进行太多更改。 - Peter Hall
1
好的,实际上现在它会让我感到困扰,所以我必须改变它 ;) - Peter Hall
我认为这是有道理的,因为你(正确地)消除了大部分OP的代码,但这取决于你。 :) - user4815162342
我喜欢你使用两个向量的巧妙技巧,然后旋转一个向量,再将它们重新组合在一起。这避免了我最初复制一个向量到另一个向量的问题。但是,你的答案将线程的链接顺序与我想要的相反。将 rotate_right 更改为 rotate_left 可以解决这个问题。(另外,我认为你在编辑时忘记省略 child 变量了) - jhscheer
@jhscheer已经修改了它 :) - Peter Hall

3
这段代码无法运行,因为无法复制Sender以创建新向量。但是,如果使用引用(& Sender):
虽然Sender无法复制,但它确实实现了Clone,因此您可以手动克隆它。但是这种方法对于Receiver不起作用,因为它不是Clone,而且您还需要从向量中提取它。
您的第一段代码存在问题,因为您无法使用let foo = vec[i]仅移出非Copy值向量的一个值。这将使向量处于无效状态,其中一个元素无效,后续访问会导致未定义的行为。为了使其正常工作,Vec需要跟踪哪些元素已移动,哪些没有移动,这将对所有Vec造成成本。因此,Vec禁止将元素移出,而将其留给用户来跟踪移动。

将值从Vec中移出的简单方法是使用Vec<Option<T>>替换Vec<T>并使用Option::takefoo = vec[i]被替换为foo = vec[i].take().unwrap(),它会将vec [i]中选项中的T值(同时断言它不是None)移动,并在向量中留下None,这是Option<T>的有效变体。这是您第一次尝试修改的方式(playground):

const NTHREADS: usize = 8;

let channels_ring: Vec<_> = {
    let mut channels: Vec<_> = (0..NTHREADS)
        .into_iter()
        .map(|_| {
            let (tx, rx) = mpsc::channel();
            (Some(tx), Some(rx))
        })
        .collect();

    (0..NTHREADS)
        .into_iter()
        .map(|rxpos| {
            let txpos = if rxpos < NTHREADS - 1 { rxpos + 1 } else { 0 };
            (
                channels[txpos].0.take().unwrap(),
                channels[rxpos].1.take().unwrap(),
            )
        })
        .collect()
};

let children: Vec<_> = channels_ring
    .into_iter()
    .enumerate()
    .map(|(i, (tx, rx))| {
        thread::spawn(move || {
            tx.send(i as i32).unwrap();
            println!("thread {} sent: {}", i, i);
            println!("thread {} recv: {:?}", i, rx.recv().unwrap());
        })
    })
    .collect();

for child in children {
    child.join().unwrap();
}

我喜欢这个答案,因为使用智能指针比Peter的回答更接近我的最初帖子。然而,我更喜欢Peter方法的优雅性。虽然你先回答了,但如果我接受Peter的答案,可以吗? - jhscheer
1
@jhscheer 当然,选择接受哪个答案完全取决于您作为问题的作者,而不管它们到达的顺序如何。我留下了我的答案,因为它实际上提供了一个不同的解决方案,这可能对未来读者有用,他们在Peter的方法无法覆盖的情况下解决相同的问题。 - user4815162342
1
@jhscheer 还要注意的是,我的答案中没有智能指针,只有 Option,因为它更便宜,不会引入新的分配或间接。在这种情况下,它甚至不会使向量变得更大,因为 Rust 优化枚举空间,尽可能地节省空间。 - user4815162342
1
@jhscheer 点赞和接受答案不是同一回事。如果这个回答对你有帮助,你也可以点赞它。 - Peter Hall
1
转念一想,尽管我最终采用了Peter的想法,但我将接受这个答案。因为它 i) 更接近我的原始方法,ii) 帮助我更好地理解为什么我的初始代码失败以及如何在未来解决类似的问题。 - jhscheer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接