trait `std::marker::Sync`没有为`std::sync::mpsc::Sender<i32>`实现

7
我正在尝试使用MPSC构建一个多线程应用程序,但遇到了标题中的错误。我不确定这种情况下的正确模式是什么 - 我正在寻找一种模式,可以克隆生产者通道并将其移动到新线程中以供使用。
这个新线程将保持一个打开的websocket,并在收到websocket消息时通过生产者发送消息数据的子集。由于消费者线程需要来自其他线程的数据,这就是我认为MPSC模式是合适选择的原因。
除了标题中的错误信息外,还显示了以下内容:
`std::sync::mpsc::Sender<i32>` cannot be shared between threads safely
help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender`

我应该实现Send吗?这是使用RcPin的适当时机吗?我认为这是因为我正在尝试在async闭包中通过.await发送一个未实现Send的类型,但我不知道如何处理这种情况。

我已经将我的问题简化为以下内容:

use futures::stream::{self, StreamExt};
use std::sync::mpsc::{channel, Receiver, Sender};

#[tokio::main]
async fn main() {
    let (tx, rx): (Sender<i32>, Receiver<i32>) = channel();

    tokio::spawn(async move {
        let a = [1, 2, 3];
        let mut s = stream::iter(a.iter())
            .cycle()
            .for_each(move |int| async {
                tx.send(*int);
            })
            .await;
    });
}

5
你需要为每个线程克隆发送者。但除此之外,在这种情况下,你可能不想使用std::sync::mpsc通道,因为它们会阻塞线程。相反,你可能希望查找tokio提供的支持异步/等待的通道。 - Locke
使用tokio库中的async通道确实是我正在寻找的解决方案。谢谢@Locke! - David Miner
1个回答

5

你的代码存在几个问题。首先是在最内层的async块中缺少一个move,因此编译器尝试借用对tx的引用。这就是为什么你会得到错误提示:Sendertx的类型)没有实现Sync

一旦添加了缺失的move,你会得到不同的错误提示:

error[E0507]: cannot move out of `tx`, a captured variable in an `FnMut` closure

现在的问题是for_each()会多次调用闭包,所以你不能将tx移入异步块中——因为在第一次调用闭包后就没有可移动的对象了。
由于MPSC通道允许多个生产者,Sender 实现了 Clone,因此您可以在将其移动到异步块之前对其进行克隆。以下代码可编译:
let (tx, _rx): (Sender<i32>, Receiver<i32>) = channel();

tokio::spawn(async move {
    let a = [1, 2, 3];
    let _s = stream::iter(a.iter())
        .cycle()
        .for_each(move |int| {
            let tx = tx.clone();
            async move {
                tx.send(*int).unwrap();
            }
        })
        .await;
});

Playground

最后,在评论中指出,你几乎肯定想在这里使用异步通道。虽然您创建的通道是无界的,因此发送者永远不会阻塞,但当没有消息时,接收器将阻塞,从而停止整个执行线程。

恰好,tokio MPSC 通道的发送方也实现了 Sync,使得代码接近您问题中的代码:

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

tokio::spawn(async move {
    let a = [1, 2, 3];
    let _s = stream::iter(a.iter())
        .cycle()
        .for_each(|int| async {
            tx.send(*int).unwrap();
        })
        .await;
});

assert_eq!(rx.recv().await, Some(1));
assert_eq!(rx.recv().await, Some(2));
assert_eq!(rx.recv().await, Some(3));

Playground


6
这个回答不仅正确地回答了我的问题,还提供了足够的上下文帮助我理解一些有关 Rust 异步编程的要点,这些要点我无法从文档中理解。关于同步接收器如何阻塞甚至对于无界通道也是特别有帮助的,这有助于理解同步和异步模式之间的关系。 谢谢你,我很感激你花时间以平实而彻底的方式解释这个问题。 - David Miner

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接