如何在Tokio中创建一个专门用于CPU密集型工作的线程池?

16

我有一个基于Tokio运行时的Rust异步服务器。它需要处理一些对延迟敏感的I/O绑定请求和重度CPU绑定请求。

我不想让CPU绑定任务垄断Tokio运行时并使I/O绑定任务饥饿,因此我想将CPU绑定任务卸载到专用的、隔离的线程池中(这里关键是隔离,因此在共享线程池上的spawn_blocking/block_in_place是不够的)。如何在Tokio中创建这样一个线程池?

启动两个运行时的天真方法会遇到错误:

thread 'tokio-runtime-worker' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.'

use tokio; // 0.2.20

fn main() {
    let mut main_runtime = tokio::runtime::Runtime::new().unwrap();
    let cpu_pool = tokio::runtime::Builder::new().threaded_scheduler().build().unwrap();
    let cpu_pool = cpu_pool.handle().clone(); // this is the fix/workaround!

    main_runtime.block_on(main_runtime.spawn(async move {
        cpu_pool.spawn(async {}).await
    }))
    .unwrap().unwrap();
}

Tokio是否可以允许两个独立的运行时?创建Tokio中一个隔离的CPU池有更好的方法吗?


我知道block_in_place,但它没有我所寻求的隔离保证。 - Kornel
我需要的是隔离保证,那么这个保证是什么?请修改您的问题以阐明所有需求。 - Shepmaster
3
@Shepmaster 我认为已经有了:“我不想让CPU绑定的任务垄断Tokio运行时并使I/O绑定的任务饥饿。” - Sven Marnach
4个回答

16

虽然Tokio已经有一个线程池,但Tokio的文档建议

如果您的代码是CPU密集型的,并且希望限制用于运行它的线程数,您应该在另一个线程池(例如Rayon)上运行它。您可以使用oneshot通道将结果发送回Tokio,当Rayon任务完成时。

因此,如果您想创建一个线程池来大量使用CPU,一个好的方法是使用像Rayon这样的crate,并将结果发送回Tokio任务。


3
spawn_blocking同样指出:为了在较少的线程上运行计算密集型任务,你应该使用单独的线程池(例如rayon),而不是配置阻塞线程的数量。 - Shepmaster
2
使用 rayon 需要通过异步通道将结果发送回来,这个过程相当繁琐;( - Kornel
1
@Kornel,我不认为通道很麻烦,但是与tokio有什么区别呢?我认为tokio生成了类似的东西。您需要一种方法来发送结果回来。不要忘记,多线程的重点是使用更多的CPU来完成更多的工作,但是开销是存在的。关键是人们只应在这种过度消耗比多线程带来的收益更糟糕时才使用多线程。 - Stargateur

8

Tokio的错误信息误导了。问题是由于在异步上下文中丢弃Runtime对象造成的。

解决方法是使用Handle而不是直接使用Runtime在其他运行时上启动任务。

fn main() {
    let mut main_runtime = tokio::runtime::Runtime::new().unwrap();
    let cpu_pool = tokio::runtime::Builder::new().threaded_scheduler().build().unwrap();

    // this is the fix/workaround:
    let cpu_pool = cpu_pool.handle().clone(); 

    main_runtime.block_on(main_runtime.spawn(async move {
        cpu_pool.spawn(async {}).await
    }))
    .unwrap().unwrap();
}

1
我不明白在你的情况下创建另一个运行时的意义所在。 - Stargateur

6

启动Tokio运行时已经创建了一个线程池。相关选项为

粗略地说,core_threads 控制用于处理异步代码的线程数。 max_threads - core_threads 是用于阻塞工作的线程数(强调我的):

否则,由于 core_threads 总是活动的,它会限制额外的线程(例如用于阻塞注释的线程),线程数为 max_threads - core_threads

您还可以通过tokio::main属性指定这些选项。

然后,您可以使用以下任一注释来注释阻塞代码:

另请参见:

spawn_blocking 可以轻松地占用单个运行时中所有可用的线程,迫使其他 future 等待它们。

你可以利用像Semaphore这样的技术来限制此情况下的最大并行性。

3
这并没有回答我的问题。我正在寻找一个专门的线程池,它不与我的主运行时共享线程。我已经在使用 block_in_place,但这会破坏我的延迟时间,并且被阻塞的线程会耗尽网络连接。 - Kornel
@Kornel 请解释一下为什么 spawn_blocking 不能实现这个目标? - Shepmaster
4
因为我有很多非常耗费 CPU 的任务,所以 spawn_blocking 可以轻易地占用仅有的运行时中所有可用的线程,迫使其他未来任务在其上等待。 - Kornel

0

到目前为止,这里没有讨论过的一个可能性是,在主运行时中创建2个线程,并使用信号量来确保您不会同时启动超过1个CPU任务的CPU绑定任务。这留下了一个保证的线程来服务非阻塞IO。

如果您的机器支持超线程技术,我仍然会使用默认数量的线程,因为对于每个IO和CPU绑定任务,您将拥有num物理核心线程,这可能是最优的。但是,如果需要,您可以增加它:

use tokio::runtime::Builder;
use tokio::sync::Semaphore;

// Half of the available threads used in the runtime
// Note: const_new requires the parking_lot feature.
static CPU_SEM: Semaphore = Semaphore::const_new(4);

fn main() {
    // build runtime with specified number of worker threads
    let runtime = Builder::new_multi_thread()
        .worker_threads(CPU_SEM.available_permits()*2)
        .build()
        .unwrap();

    runtime.block_on(runtime.spawn(async move {
       cpu_task().await        
    })).unwrap(); 
}

async fn cpu_task() {
    // The task will sleep until a permit is available
    let permit = CPU_SEM.acquire().await.unwrap();

    // Do CPU heavy work here, we won't starve the runtime

    // permit is dropped, letting another CPU task run
}

然而,查看文档和所有答案(包括这个)似乎都已经过时了。Tokio似乎支持单独的阻塞线程池:
pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self

指定运行时生成的额外线程的限制。
这些线程用于像通过spawn_blocking生成的任务这样的阻塞操作。与worker_threads不同,它们并不总是活动的,并且如果闲置时间过长,它们将退出。您可以使用thread_keep_alive更改此超时持续时间。
默认值为512。
因此,现在您只需使用spawn_blocking即可。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接