如何从标准库生成的线程发送消息到Tokio异步任务?

3

我有一个设置,我的程序使用std :: thread :: spawn 生成多个线程进行CPU绑定计算。

我需要一个GRPC服务器来处理传入的命令,并且还要流式传输工作线程完成的输出。 我正在使用 tonic 用于GRPC服务器,它只提供了在Tokio未来中的异步实现。

我需要能够从“正常”的标准库线程向Tokio未来发送消息。

我将代码简化为最小值:

use std::thread;
use tokio::sync::mpsc; // 1.9.0

fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
    tokio_runtime.spawn(async move {
        // the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
        while let Some(v) = rx.recv().await {}
    });

    let h = thread::spawn(move || {
        // do work
        tx.send(1).await; //<------ error occurs here since I can't await in a non-async block
    });

    h.join().unwrap();
}

我的主工作线程如何与由Tokio生成的GRPC服务器通信?

使用无界通道,这样非异步线程就不需要等待。 - Colonel Thirty Two
mpsc::channel(1) 这里是一个 Tokio 通道,我看到这个函数必须接受一个大于 0 的缓冲区。 - l3utterfly
我需要 tokio::sync::mpsc 通道,因为在 tokio 运行时中,我需要使用 recv().await ,这样它就不会阻塞。 - l3utterfly
1
UnboundedSender.send 不是异步的,也不会阻塞。https://docs.rs/tokio/1.9.0/tokio/sync/mpsc/fn.unbounded_channel.html - Colonel Thirty Two
1
使用无界队列通常是一个非常糟糕的想法,因为它可能会导致OOM和其他类型的资源耗尽。使用带有blocking_send()的有界通道更好。 - Svetlin Zarev
1个回答

6
您可以使用Tokio的sync功能。有两个选项 - UnboundedSenderSender::blocking_send()
无限制的发送者存在问题,即如果生产者比消费者快,则应用程序可能会因为内存不足错误或耗尽生产者使用的其他有限资源而崩溃。
总的来说,您应该避免使用无限制队列,这让我们选择更好的选项-使用blocking_send()Playground:
use std::thread;
use tokio::sync::mpsc; // 1.9.0

fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
    tokio_runtime.spawn(async move {
        // the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
        while let Some(v) = rx.recv().await {
            println!("Received: {:?}", v);
        }
    });

    let h = thread::spawn(move || {
        // do work
        tx.blocking_send(1).unwrap();
    });

    h.join().unwrap();
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接