我正在使用Tokio进行一些异步Rust代码的开发,但遇到了一个问题。我有一些需要访问连接池的任务,而连接池的特性是一次只能运行固定数量(NUMCPUS) - 所有其他请求都将被阻塞,直到有空闲连接。
目前,我只是使用task::spawn_blocking,这种方式有点奏效。然而,它的缺点是一旦有512个请求在连接池上阻塞,整个Tokio的阻塞池(blocking pool)就会耗尽,所有的阻塞任务都排队等待。这将阻止任何不依赖于连接池却需要调用spawn_blocking的代码从运行。
有没有办法告诉Tokio保持一组阻塞任务并仅同时生成N个,同时允许不相关的阻塞任务运行而不排队?
spawn_blocking文档建议使用Rayon来处理CPU密集型任务,但是a)不清楚如何将Rayon与Tokio集成,b)我的任务本来就不是CPU密集型任务。
目前,我只是使用task::spawn_blocking,这种方式有点奏效。然而,它的缺点是一旦有512个请求在连接池上阻塞,整个Tokio的阻塞池(blocking pool)就会耗尽,所有的阻塞任务都排队等待。这将阻止任何不依赖于连接池却需要调用spawn_blocking的代码从运行。
有没有办法告诉Tokio保持一组阻塞任务并仅同时生成N个,同时允许不相关的阻塞任务运行而不排队?
spawn_blocking文档建议使用Rayon来处理CPU密集型任务,但是a)不清楚如何将Rayon与Tokio集成,b)我的任务本来就不是CPU密集型任务。
self.sem.acquire()
,否则信号量实际上并没有被获取,这样编译可以通过,但结果会导致与 OP 的原始代码一样的饥饿问题。此外,在进行这个过程时,接受一个闭包并将其提交给spawn_blocking()
可能是一个好主意,以避免重复该部分:https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=20be58c9d2556d36ce4710a819ec5331 - user4815162342acquire
的观点很好。然而,我理解问题的方式是,spawn_blocking
只是一种使用tokio池限制的hack,因此不再需要它(尽管根据实际用例,常规的tokio::spawn
可能是有意义的)。 - Jmbspawn_blocking()
。它也只运行一定数量的实例,但我也理解信号实例也是阻塞的,因此不使用spawn_blocking()
将会阻塞调用它的线程。OP 的问题在于未经检查地使用spawn_blocking()
淹没了spawn_blocking()
内部使用的线程池 - 而您的答案完美地解决了这个问题。 - user4815162342