检查 `concurrent.futures.ThreadPoolExecutor` 的运行情况

24

我有一个活动的concurrent.futures.ThreadPoolExecutor。我想要检查它的状态,想知道有多少个线程,有多少个处理任务和哪些任务,有多少个是空闲的,以及哪些任务在队列中。我该如何找出这些信息?


我不相信这些操作是 API 的一部分。你可能需要深入源代码来查看这些内部操作:http://hg.python.org/cpython/file/default/Lib/concurrent/futures/thread.py http://hg.python.org/cpython/file/default/Lib/concurrent/futures/process.py - hughdbrown
现在还无法确定哪些线程正在忙碌。代码中有一个TODO,指出应该添加这种能力,以便如果已经有空闲的线程可以处理任务,则不会创建新的线程来处理任务。 - dano
2个回答

14

可以查看池和待处理工作项队列的一些可见性。要了解可用内容,请打印poolx.__dict__以查看结构。阅读ThreadPool代码,它很好:concurrent.futures.thread

以下创建一个带有一个线程的池。然后创建两个作业:一个睡眠3秒,另一个立即返回。然后打印出池中待处理工作项的数量。

接下来,我们从工作队列中打印出项目。在这种情况下,一个线程已经执行time.sleep(3)函数,因此不在队列中。打印具有参数[0]和kwargs {}sleep函数,因为那是池要运行的下一个工作项。

感谢@dano提供非破坏性队列洞察力,以及@abarnert。

来源

import concurrent.futures, time

poolx = concurrent.futures.ThreadPoolExecutor(max_workers=1)
poolx.submit(time.sleep, 3)
poolx.submit(time.sleep, 0)   # very fast

print('pending:', poolx._work_queue.qsize(), 'jobs')
print('threads:', len(poolx._threads))
print()

# TODO: make thread safe; work on copy of queue?
print('Estimated Pending Work Queue:')
for num,item in enumerate(poolx._work_queue.queue):
    print('{}\t{}\t{}\t{}'.format(
        num+1, item.fn, item.args, item.kwargs,
        ))

poolx.shutdown(wait=False)

输出

pending: 1 jobs
threads: 1

Pending Work Queue:
1   <built-in function sleep>   (0,)    {}

1
你可以在不破坏队列的情况下查看其中的内容:for item in poolx._work_queue.queue: print(item.fn, item.args, item.kwargs) - dano
3
这段代码不是线程安全的。pool._work_queue 这一部分没问题,但是对 pool._work_queue.queue 进行迭代是有问题的。这个队列成员只应该在适当同步时使用;你的迭代器可能会被另一个线程推入或弹出而无效。在低争用场景下通常可以在 CPython 中成功运行,但在没有 GIL 的实现或高争用情况下(即你真正需要队列的情况)会失败。 - abarnert
1
我想你可以在迭代工作队列之前获取 pool._work_queue.mutex。这是 queue.Queue 用于同步的全部内容。 - dano
@shavenwarthog:我不知道“在队列的副本上工作”是否会对此有所帮助。是否有同步复制操作?如果没有,那么它只会缩小您的竞争条件,使其更难调试,而不是消除它。 - abarnert
1
有人知道这个是哪个版本的Python吗?我正在尝试,但在<threadpool>._work_queue.queue上出现属性错误。AttributeError: '_queue.SimpleQueue'对象没有'queue'属性。 - smokes2345
显示剩余3条评论

0

没有非常干净和可靠的方法来查找pending futures,但我是这样做的:

if 'state=pending' in str(future):
    logger.debug('PENDING')
elif future.running():
    logger.debug('RUNNING')
elif future.cancelled():
    logger.debug('CANCELLED')
elif future.exception():
    logger.debug('EXCEPTION')
elif future.done():
    logger.debug('DONE')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接