我们正在使用concurrent.futures
中的ProcessPoolExecutor在一个异步接收请求的服务中,实际的同步处理是在进程池中进行的。
一旦我们遇到进程池耗尽的情况,新的请求就必须等待直到其他一些进程完成。
是否有一种方法可以查询进程池的当前使用情况?这将允许我们监视它们的状态并进行适当的容量规划。
如果没有,是否有任何良好的替代进程池实现,具有支持此类监视/容量规划的异步界面?
我们正在使用concurrent.futures
中的ProcessPoolExecutor在一个异步接收请求的服务中,实际的同步处理是在进程池中进行的。
一旦我们遇到进程池耗尽的情况,新的请求就必须等待直到其他一些进程完成。
是否有一种方法可以查询进程池的当前使用情况?这将允许我们监视它们的状态并进行适当的容量规划。
如果没有,是否有任何良好的替代进程池实现,具有支持此类监视/容量规划的异步界面?
最简单的方法是扩展ProcessPoolExecutor
以获得所需的行为。下面的示例保持stdlib接口,不访问实现细节:
from concurrent.futures import ProcessPoolExecutor
class MyProcessPoolExecutor(ProcessPoolExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._running_workers = 0
def submit(self, *args, **kwargs):
future = super().submit(*args, **kwargs)
self._running_workers += 1
future.add_done_callback(self._worker_is_done)
return future
def _worker_is_done(self, future):
self._running_workers -= 1
def get_pool_usage(self):
return self._running_workers
_worker_is_done
的参数列表进行修复(在上面的文本中已经修复),它接收future作为参数,因此除了self
之外还需要一个参数。现在它能正常工作了,谢谢! - moritz最近我用一种稍微不同的方法解决了这个问题。简化来说,我的做法如下:
因此,假设 done()
是实际的回调函数(在其他地方定义),则以下内容被定义在我的主循环的范围内:
bag = set()
def make_callback(b):
def callback(f):
nonlocal b
b.remove(f)
done(f)
return callback
对于我提交给ProcessPoolExecutor的每个未来f
,我都会添加回调函数:
f.add_done_callback(make_callback(bag))
随时可以通过查看bag
的内容来查看待处理和正在运行的future列表,可选择使用future的running()
方法进行过滤。例如:
print(*bag, sep='\n')
print('running:', *(f for f in bag if f.running()))
对于许多简单的用例,模块级别的设置变量可能与闭包一样有效。
ProcessPoolExecutor._pending_work_items
的长度即可。如果大于零,则表示有等待处理的工作项。 - fpbhbconcurrent.futures
也不受multiprocessing.pool
支持。而且这也有点毫无意义,因为一旦你的硬件资源耗尽,某些东西必须等待。 - fpbhb