如何在单个任务中同时从多个Tokio通道读取消息?

3

我希望能够同时读取和处理两个频道的消息,并构建另一条消息通过另一个频道进行发送。

这两个频道接收到的消息频率不同(根据sleep而定)。

例如:"foo1"和"bar1"被接收,我们将对它们进行处理并形成"foo1bar1"。"foo2"被接收("bar2"将在2秒内接收),因此我们将其处理为"foo2bar1"。"foo3"被接收,因此构建"foo3bar1"。当接收到"bar2"时,我们获得"foo4bar2",以此类推。

在当前的实现中,由于这两个任务之间没有通信,因此无法进行"fooNbarM"的构造。

use std::time::Duration;
use tokio;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::time::sleep;
use futures::future::join_all;

async fn message_sender(msg: &'static str, foo_tx: UnboundedSender<Result<&str, Box<dyn std::error::Error + Send>>>) {
    loop {
        match foo_tx.send(Ok(msg)) {
            Ok(()) => {
                if msg == "foo" {
                    sleep(Duration::from_millis(1000)).await;
                } else {
                    sleep(Duration::from_millis(3000)).await;
                }
            }
            Err(_) => {
                println!("failed to send foo");
                break;
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let result: Vec<&str> = vec![];

    let (foo_tx, mut foo_rx): (
        UnboundedSender<Result<&str, Box<dyn std::error::Error + Send>>>,
        UnboundedReceiver<Result<&str, Box<dyn std::error::Error + Send>>>,
    ) = tokio::sync::mpsc::unbounded_channel();
    let (bar_tx, mut bar_rx): (
        UnboundedSender<Result<&str, Box<dyn std::error::Error + Send>>>,
        UnboundedReceiver<Result<&str, Box<dyn std::error::Error + Send>>>,
    ) = tokio::sync::mpsc::unbounded_channel();

    let foo_sender_handle = tokio::spawn(async move {
        message_sender("foo", foo_tx).await;
    });

    let foo_handle = tokio::spawn(async move {
        while let Some(v) = foo_rx.recv().await {
            println!("{:?}", v);
        }
    });

    let bar_sender_handle = tokio::spawn(async move {
        message_sender("bar", bar_tx).await;
    });

    let bar_handle = tokio::spawn(async move {
        while let Some(v) = bar_rx.recv().await {
            println!("{:?}", v);
        }
    });

    let handles = vec![foo_sender_handle, foo_handle, bar_sender_handle, bar_handle];
    join_all(handles.into_iter()).await;
}

Cargo.toml

[package]
name = "play"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.16.1", features = ["full"] }
futures = "0.3.21"
1个回答

4

使用tokio::select等待任一通道准备就绪:

use futures::future; // 0.3.19
use std::time::Duration;
use tokio::{
    sync::mpsc::{self, UnboundedSender},
    time,
}; // 1.16.1

async fn message_sender(msg: &'static str, foo_tx: UnboundedSender<String>) {
    for count in 0.. {
        let message = format!("{msg}{count}");
        foo_tx.send(message).unwrap();

        if msg == "foo" {
            time::sleep(Duration::from_millis(100)).await;
        } else {
            time::sleep(Duration::from_millis(300)).await;
        }
    }
}

#[tokio::main]
async fn main() {
    let (foo_tx, mut foo_rx) = mpsc::unbounded_channel();
    let (bar_tx, mut bar_rx) = mpsc::unbounded_channel();

    let foo_sender_handle = tokio::spawn(message_sender("foo", foo_tx));
    let bar_sender_handle = tokio::spawn(message_sender("bar", bar_tx));

    let receive_handle = tokio::spawn(async move {
        let mut foo = None;
        let mut bar = None;

        loop {
            tokio::select! {
                f = foo_rx.recv() => foo = f,
                b = bar_rx.recv() => bar = b,
            }

            if let (Some(foo), Some(bar)) = (&foo, &bar) {
                println!("{foo}{bar}");
            }
        }
    });

    future::join_all([foo_sender_handle, bar_sender_handle, receive_handle]).await;
}

你还需要处理只收到一条消息的情况,因此Option很有用。


谢谢。这正是我想要的!如何执行 println!("{foo}{bar}")?它比 println!("{}{}", foo, bar) 更易读,但当我尝试时,会出现 error: there is no argument named foo 的错误提示。 - user270199
1
@user270199 升级到更新的 Rust 版本。请参阅 有没有一种方法可以在不重复变量名称的情况下传递命名参数给格式宏? - Shepmaster

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接