如何监控Python的concurrent.futures.ProcessPoolExecutor?

14

我们正在使用concurrent.futures中的ProcessPoolExecutor在一个异步接收请求的服务中,实际的同步处理是在进程池中进行的。

一旦我们遇到进程池耗尽的情况,新的请求就必须等待直到其他一些进程完成。

是否有一种方法可以查询进程池的当前使用情况?这将允许我们监视它们的状态并进行适当的容量规划。

如果没有,是否有任何良好的替代进程池实现,具有支持此类监视/容量规划的异步界面?


3
只需查看工作队列 ProcessPoolExecutor._pending_work_items 的长度即可。如果大于零,则表示有等待处理的工作项。 - fpbhb
@fpbhb 这是一个私有属性,这就是不使用它的好理由,而且它也是一个二进制信号,所以不适合预防措施。所以,谢谢,但我希望得到更好的东西。 - moritz
这是Python,不是吗?但是撇开这个不谈:你想要实现什么目标?动态调整工作人员数量以永远不让工作等待?这既不受concurrent.futures也不受multiprocessing.pool支持。而且这也有点毫无意义,因为一旦你的硬件资源耗尽,某些东西必须等待。 - fpbhb
你需要一个跨越多台机器的进程池,才能完成这个编程任务。现在你正在使用的进程池是无法实现这个目标的。你需要一个类似 AMQP 的网络机制来分散负载。 - fpbhb
@fpbhb,已经涉及到AMQP,而且该服务已经在两台机器上运行。 - moritz
显示剩余3条评论
2个回答

14

最简单的方法是扩展ProcessPoolExecutor以获得所需的行为。下面的示例保持stdlib接口,不访问实现细节:

from concurrent.futures import ProcessPoolExecutor


class MyProcessPoolExecutor(ProcessPoolExecutor):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._running_workers = 0

    def submit(self, *args, **kwargs):
        future = super().submit(*args, **kwargs)
        self._running_workers += 1
        future.add_done_callback(self._worker_is_done)
        return future

    def _worker_is_done(self, future):
        self._running_workers -= 1

    def get_pool_usage(self):
        return self._running_workers

1
我不得不为_worker_is_done的参数列表进行修复(在上面的文本中已经修复),它接收future作为参数,因此除了self之外还需要一个参数。现在它能正常工作了,谢谢! - moritz
结果不是正在运行的工作人员的规模,它错误地包括了待处理的任务。 - undefined

2

最近我用一种稍微不同的方法解决了这个问题。简化来说,我的做法如下:

  • 我在主循环的范围内定义一个 pending futures 集合,用于外部跟踪未完成的 futures。
  • 我为每个 future 添加一个回调函数,此回调函数是闭包,可操作 futures 集合,在处理完后将其从集合中移除。

因此,假设 done() 是实际的回调函数(在其他地方定义),则以下内容被定义在我的主循环的范围内:

bag = set()

def make_callback(b):

    def callback(f):
        nonlocal b
        b.remove(f)
        done(f)

    return callback

对于我提交给ProcessPoolExecutor的每个未来f,我都会添加回调函数:

f.add_done_callback(make_callback(bag))

随时可以通过查看bag的内容来查看待处理和正在运行的future列表,可选择使用future的running()方法进行过滤。例如:

print(*bag, sep='\n')
print('running:', *(f for f in bag if f.running()))

对于许多简单的用例,模块级别的设置变量可能与闭包一样有效。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接