有没有办法为tokio::spawn_blocking创建多个池,以便一些任务不会饿死其他任务?

3
我正在使用Tokio进行一些异步Rust代码的开发,但遇到了一个问题。我有一些需要访问连接池的任务,而连接池的特性是一次只能运行固定数量(NUMCPUS) - 所有其他请求都将被阻塞,直到有空闲连接。
目前,我只是使用task::spawn_blocking,这种方式有点奏效。然而,它的缺点是一旦有512个请求在连接池上阻塞,整个Tokio的阻塞池(blocking pool)就会耗尽,所有的阻塞任务都排队等待。这将阻止任何不依赖于连接池却需要调用spawn_blocking的代码从运行。
有没有办法告诉Tokio保持一组阻塞任务并仅同时生成N个,同时允许不相关的阻塞任务运行而不排队?
spawn_blocking文档建议使用Rayon来处理CPU密集型任务,但是a)不清楚如何将Rayon与Tokio集成,b)我的任务本来就不是CPU密集型任务。
1个回答

3

您可以使用Semaphore:初始化它的并发任务数量,每个任务在处理之前都需要获取信号量并在处理完后释放。类似以下代码(未经测试):

use tokio::sync::Semaphore;

struct Pool {
    sem: Semaphore,
}

impl Pool {
    fn new (size: usize) -> Self {
        Pool { sem: Semaphore::new (size), }
    }

    async fn spawn<T> (&self, f: T) -> T::Output
    where
        T: Future + Send + 'static,
        T::Output: Send + 'static,
    {
        let handle = self.sem.acquire().await;
        f.await
    }
}

2
你需要等待 self.sem.acquire(),否则信号量实际上并没有被获取,这样编译可以通过,但结果会导致与 OP 的原始代码一样的饥饿问题。此外,在进行这个过程时,接受一个闭包并将其提交给 spawn_blocking() 可能是一个好主意,以避免重复该部分:https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=20be58c9d2556d36ce4710a819ec5331 - user4815162342
关于等待acquire的观点很好。然而,我理解问题的方式是,spawn_blocking只是一种使用tokio池限制的hack,因此不再需要它(尽管根据实际用例,常规的tokio::spawn可能是有意义的)。 - Jmb
有趣的观察。我理解 OP 的问题是因为他们调用的 API 本质上是阻塞的,所以需要 spawn_blocking()。它也只运行一定数量的实例,但我也理解信号实例也是阻塞的,因此不使用 spawn_blocking() 将会阻塞调用它的线程。OP 的问题在于未经检查地使用 spawn_blocking() 淹没了 spawn_blocking() 内部使用的线程池 - 而您的答案完美地解决了这个问题。 - user4815162342

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接