我正在尝试使用MPSC构建一个多线程应用程序,但遇到了标题中的错误。我不确定这种情况下的正确模式是什么 - 我正在寻找一种模式,可以克隆生产者通道并将其移动到新线程中以供使用。
这个新线程将保持一个打开的websocket,并在收到websocket消息时通过生产者发送消息数据的子集。由于消费者线程需要来自其他线程的数据,这就是我认为MPSC模式是合适选择的原因。
除了标题中的错误信息外,还显示了以下内容:
这个新线程将保持一个打开的websocket,并在收到websocket消息时通过生产者发送消息数据的子集。由于消费者线程需要来自其他线程的数据,这就是我认为MPSC模式是合适选择的原因。
除了标题中的错误信息外,还显示了以下内容:
`std::sync::mpsc::Sender<i32>` cannot be shared between threads safely
help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender`
我应该实现Send
吗?这是使用Rc
或Pin
的适当时机吗?我认为这是因为我正在尝试在async
闭包中通过.await
发送一个未实现Send
的类型,但我不知道如何处理这种情况。
我已经将我的问题简化为以下内容:
use futures::stream::{self, StreamExt};
use std::sync::mpsc::{channel, Receiver, Sender};
#[tokio::main]
async fn main() {
let (tx, rx): (Sender<i32>, Receiver<i32>) = channel();
tokio::spawn(async move {
let a = [1, 2, 3];
let mut s = stream::iter(a.iter())
.cycle()
.for_each(move |int| async {
tx.send(*int);
})
.await;
});
}
std::sync::mpsc
通道,因为它们会阻塞线程。相反,你可能希望查找tokio
提供的支持异步/等待的通道。 - Locketokio
库中的async
通道确实是我正在寻找的解决方案。谢谢@Locke! - David Miner