如何查找活动的Tokio任务数量?

3

我想获取活动的tokio任务计数。在Python中,我可以使用len(asyncio.all_tasks())来返回当前正在运行的循环的未完成任务。我想知道是否有相应的方法在Tokio中。

以下是示例代码:

use std::time::Duration;
use tokio; // 1.24.1
use tokio::time::sleep;

fn active_tasks() -> usize {
    todo!("get active task somehow")
}

#[tokio::main]
async fn main() {
    tokio::spawn(async { sleep(Duration::from_secs(5)).await });
    tokio::spawn(async { sleep(Duration::from_secs(1)).await });
    tokio::spawn(async { sleep(Duration::from_secs(3)).await });

    println!("t = 0, running = {}", active_tasks());

    sleep(Duration::from_secs(2)).await;
    println!("t = 2, running = {}", active_tasks());

    sleep(Duration::from_secs(4)).await;
    println!("t = 6, running = {}", active_tasks());
}

我期望上述程序的输出将打印出活动任务的数量,因为主函数本身就是一个tokio任务,所以如果出现以下输出,我也不会感到惊讶:
t = 0, running = 4
t = 2, running = 3
t = 6, running = 1

active_tasks()如果需要的话可以是一个异步函数。


1
只是好奇:你需要这个数字做什么? - Chayim Friedman
我有一个Web服务器,它会启动一个长时间运行的任务,该任务又会启动许多其他长时间运行的任务。当服务器收到中止请求时,最好中止父任务及其所有子任务。我希望在生成和中止之前后看到活动任务的数量相同。 - coder3101
4
这只是为了调试目的吗?那么最好使用类似 https://github.com/tokio-rs/console 的东西。 - Chayim Friedman
这肯定有助于我的使用情况,但我会让问题保持开放状态,因为可能还有其他用例,而来自另一种语言的人可能正在寻找类似于 tokio::active_tasks() 可以返回数量的东西。(也许在未来会有) - coder3101
现在我在思考...如果tokio-console可以通过使用跟踪API从应用程序远程获取任务列表,那么我们肯定也可以做到同样的事情,对吧?这将需要tokio_unstable,但应该是可行的。等我有时间了,我会尝试着去解决这个问题。如果有人想在我之前完成这个任务,那就太好了。 - Chayim Friedman
显示剩余5条评论
2个回答

2
在 tokio 1.29 中,RuntimeMetrics 现在有一个方法 active_task_count(),它返回活动 tokio 任务的数量。
use tokio::runtime::Handle;

#[tokio::main]
async fn main() {
    let metrics = Handle::current().metrics();

    let n = metrics.active_tasks_count();
    println!("Runtime has {} active tasks", n);
}

1

我原本希望不稳定的RuntimeMetrics能够为您解决这个问题,但它似乎是为了不同的目的而设计的。我不认为Tokio能够为您处理这个问题。

话虽如此,这里有一个可能的解决方案来实现类似的结果:

use std::{
    future::Future,
    sync::{Arc, Mutex},
    time::Duration,
};
use tokio::time::sleep;

struct ThreadManager {
    thread_count: Arc<Mutex<usize>>,
}

impl ThreadManager {
    #[must_use]
    fn new() -> Self {
        Self {
            thread_count: Arc::new(Mutex::new(0)),
        }
    }

    fn spawn<T>(&self, future: T)
    where
        T: Future + Send + 'static,
        T::Output: Send + 'static,
    {
        // Increment the internal count just before the thread starts.
        let count = Arc::clone(&self.thread_count);
        *count.lock().unwrap() += 1;

        tokio::spawn(async move {
            let result = future.await;
            
            // Once we've executed the future, let's decrement this thread.
            *count.lock().unwrap() -= 1;

            result
        });
    }

    fn thread_count(&self) -> usize {
        // Get a copy of the current thread count.
        *Arc::clone(&self.thread_count).lock().unwrap()
    }
}

#[tokio::main]
async fn main() {
    let manager = ThreadManager::new();

    manager.spawn(async { sleep(Duration::from_secs(5)).await });
    manager.spawn(async { sleep(Duration::from_secs(1)).await });
    manager.spawn(async { sleep(Duration::from_secs(3)).await });

    println!("t = 0, running = {}", manager.thread_count());

    sleep(Duration::from_secs(2)).await;
    println!("t = 2, running = {}", manager.thread_count());

    sleep(Duration::from_secs(4)).await;
    println!("t = 6, running = {}", manager.thread_count());
}

结果是:

t = 0, running = 3
t = 2, running = 2
t = 6, running = 0

这将大致实现您所描述的功能。为了更接近您想要的效果,您可以将管理器与lazy_static结合起来,并将其包装在名为spawn的函数中。您还可以从1开始计数以考虑主线程。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接