如何在不阻塞父任务的情况下,在另一个任务中生成长时间运行的Tokio任务?

4
我将尝试构建一个对象,可以管理websocket的提要(feed),但要能够在多个提要(feed)之间切换。
有一个Feed特质:
trait Feed {
    async fn start(&mut self);
    async fn stop(&mut self);
}

有三个结构体实现了FeedABC

当调用start时,它会开始一个无限循环,监听来自websocket的消息并处理每个消息。

我想要实现一个FeedManager,它维护单个活动的feed,但可以接收命令以切换使用哪个feed源。

enum FeedCommand {
    Start(String),
    Stop,
}

struct FeedManager {
    active_feed_handle: tokio::task::JoinHandle,
    controller: mpsc::Receiver<FeedCommand>,
}

impl FeedManager {
    async fn start(&self) {
        while let Some(command) = self.controller.recv().await {
            match command {
                FeedCommand::Start(feed_type) => {
                    // somehow tell the active feed to stop (need channel probably) or kill the task?

                    if feed_type == "A" {
                        // replace active feed task with a new tokio task for consuming feed A
                    } else if feed_type == "B" {
                        // replace active feed task with a new tokio task for consuming feed B
                    } else {
                        // replace active feed task with a new tokio task for consuming feed C
                    }
                }
            }
        }
    }
}

我很难理解如何正确管理所有的Tokio任务。 FeedManager 的核心循环是永久监听新进来的命令,但它需要能够生成另一个长期运行的任务而不会在此过程中发生阻塞(以便它可以继续监听命令)。

我的第一次尝试是:

if feed_type == "A" {
    self.active_feed_handle = tokio::spawn(async {
        A::new().start().await;
    });

    self.active_feed_handle.await
}
  • 在句柄上使用.await会导致核心循环不再接受命令,是吗?
  • 我可以省略最后一个.await,任务仍然运行吗?
  • 我需要以某种方式清理当前活动任务吗?

不活跃的订阅源中的消息你希望怎样处理?它们应该永久排队还是在没有人听取时被丢弃,还是其他什么方式? - GManNickG
当一个 feed 被停止时,它将关闭 websocket,因此它应该停止存在。 - John Cantrell
很难回答你的问题,因为它没有包含一个 [MRE]。我们无法确定代码中存在哪些 crates(及其版本)、types、traits、fields等。如果可能的话,您可以在Rust Playground上尝试重现错误,否则可以在全新的Cargo项目中进行,然后[编辑]您的问题以包括额外的信息。这里有一些Rust特定的MRE提示,您可以使用它们来缩小您的原始代码以便在此处发布。谢谢! - Shepmaster
值得注意的是,您的代码没有使用有效的 Rust 语法 —— 在当今的 Rust 中,特质中不能使用 async fn - Shepmaster
2个回答

7

您可以通过生成任务来启动一个长时间运行的Tokio任务,而不会阻塞父任务——这是任务存在的主要原因之一。如果您不 .await 该任务,则不会等待该任务:

use std::time::Duration;
use tokio::{task, time}; // 1.3.0

#[tokio::main]
async fn main() {
    task::spawn(async {
        time::sleep(Duration::from_secs(100)).await;
        eprintln!(
            "You'll likely never see this printed \
            out because the parent task has exited \
            and so has the entire program"
        );
    });
}

另请参阅:


0

实现这个的一种方法是使用Tokio的join!()宏,该宏接受多个future并等待它们全部完成。您可以创建多个future并将它们join!()在一起以便集体等待。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接