Python:如何检查多进程池中待处理任务的数量?

14

我有一个小的工作池(4个)和一个非常大的任务列表(5000个)。 我正在使用池并使用map_async()发送任务。 由于我运行的任务相当长,因此我强制将chunksize设置为1,以防止一个长时间运行的进程阻塞一些较短的进程。

我想定期检查还有多少任务需要提交。 我知道最多会有4个任务处于活动状态,我关心的是还有多少任务需要处理。

我已经搜索了一下,但找不到任何人这样做。

以下是一些简单的代码以帮助理解:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break

我应该注意到,我正在使用RHEL-6系统上的python2.6,但是我也可以接受不同版本/平台上的示例。 - jkeating
静态变量在任务完成时递减(当然,在任务开始时递增)。 - Guillaume
任务直到工人开始处理它们才会“启动”。我想,如果我创建一个全局变量,其大小等于要完成的任务数量,然后每次任务开始时将其递减,那么可能可以解决问题,但这有点笨拙,并需要考虑一些线程安全性问题。 - jkeating
将示例代码编译并运行所需的更改:http://fpaste.org/p4Hb/。另外:https://gist.github.com/902947 - Adam Monsen
4个回答

9

看起来你需要的是 jobs._number_left。下划线_表示它是一个内部值,可能会随开发人员的意愿而改变,但这似乎是获取该信息的唯一方法。


1
啊!这不在API文档中,而且我忘记在ipython中对jobs进行dir()了。感谢您的答案! - jkeating
API文档中为什么没有_number_left这个变量?它将会被弃用或者改名吗?有什么好的理由没有将其包含在API文档中吗? - Sussch

3

如果您使用apply_async,则可以通过查看Pool._cache属性来检查待处理作业的数量。这是存储ApplyResult直到它们可用并等于待处理ApplyResult数量的位置。

import multiprocessing as mp
import random
import time


def job():
    time.sleep(random.randint(1,10))
    print("job finished")

if __name__ == '__main__':
    pool = mp.Pool(5)
    for _ in range(10):
        pool.apply_async(job)

    while pool._cache:
        print("number of jobs pending: ", len(pool._cache))
        time.sleep(2)

    pool.close()
    pool.join()

1

我不知道有没有绝对可靠的方法,但是如果你使用 Pool.imap_unordered() 函数代替 map_async,你就可以拦截被处理的元素。

import multiprocessing
import time

process_count = 4

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)
    # Actually, you should return the job you've created here.
    return num

pool = multiprocess.Pool(process_count)
jobs  = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
    jobs.append(job)
    job_count += 1

    incomplete = len(items) - job_count
    unsubmitted = max(0, incomplete - process_count)

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted

pool.close()

我正在减去process_count,因为你可以假设所有进程都将使用其中两个例外之一进行处理:1)如果使用迭代器,则可能没有更多的项可供消耗和处理,2)您可能只剩下少于4个项。 我没有为第一个例外编写代码。 但是,如果需要,这应该很容易做到。 无论如何,您的示例使用列表,因此不应该有这个问题。

编辑:我还意识到您正在使用While循环,这使它看起来像您正在尝试定期更新某些内容,例如每半秒钟或其他时间间隔。 我给出的示例代码不会以这种方式执行。 我不确定这是否是一个问题。


谢谢。我还没有真正探索过imap函数(文档有点简略)。不过你说得对,我想在任务进行的同时做一些其他事情,并定期报告剩余的任务数量。 - jkeating

1

我有类似的需求:跟踪进度,根据结果执行中期工作,在任意时间干净地停止所有处理。我是用apply_async一次发送一个任务来处理它的。下面是我所做的一个大大简化的版本:

maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
    if stopNowBooleanFunc():  # if for whatever reason I want to stop processing early
        if donecounter == sendcounter:  # wait til already sent tasks finish running
            break
    else:  # don't send new tasks if it's time to stop
        while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
            pool.apply_async(mytask, (runlist[sendcounter], q))
            sendcounter += 1

    while not q.empty():  # process completed results as they arrive
        aresult = q.get()
        processResults(aresult)
        donecounter += 1

请注意,我使用 Queue 而不是 return 返回结果。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接