如何使用多进程和pool.map跟踪状态?

7

我第一次设置多进程模块,基本上,我计划做的事情是:

from multiprocessing import pool
pool = Pool(processes=102)
results = pool.map(whateverFunction, myIterable)
print 1

据我理解,只有当所有进程都返回并完成结果后,1 才会被打印出来。我希望能够得到一些关于这方面状态的更新。最好的实现方式是什么?
如果有大约200个值,我有些犹豫让 whateverFunction() 进行打印操作。尤其是如果会打印出200次“处理完成”,那将不太有用。
我期望的输出格式如下:
10% of myIterable done
20% of myIterable done
1个回答

10

pool.map 会一直阻塞,直到所有并发的函数调用完成。 pool.apply_async 不会阻塞。此外,您可以使用其 callback 参数来报告进度。回调函数 log_result 在每次 foo 完成时被调用。它接收由 foo 返回的值。

from __future__ import division
import multiprocessing as mp
import time

def foo(x):
    time.sleep(0.1)
    return x

def log_result(retval):
    results.append(retval)
    if len(results) % (len(data)//10) == 0:
        print('{:.0%} done'.format(len(results)/len(data)))

if __name__ == '__main__':
    pool = mp.Pool()
    results = []
    data = range(200)
    for item in data:
        pool.apply_async(foo, args=[item], callback=log_result)
    pool.close()
    pool.join()
    print(results)
产生。
10% done
20% done
30% done
40% done
50% done
60% done
70% done
80% done
90% done
100% done
[0, 1, 2, 3, ..., 197, 198, 199]

上述的log_result函数修改了全局变量results并访问了全局变量data。您无法将这些变量传递给log_result,因为在pool.apply_async中指定的回调函数始终只使用一个参数即foo的返回值。

但是,您可以创建一个闭包,这至少可以清楚地表明log_result依赖哪些变量:

from __future__ import division
import multiprocessing as mp
import time

def foo(x):
    time.sleep(0.1)
    return x

def make_log_result(results, len_data):
    def log_result(retval):
        results.append(retval)
        if len(results) % (len_data//10) == 0:
            print('{:.0%} done'.format(len(results)/len_data))
    return log_result

if __name__ == '__main__':
    pool = mp.Pool()
    results = []
    data = range(200)
    for item in data:
        pool.apply_async(foo, args=[item], callback=make_log_result(results, len(data)))
    pool.close()
    pool.join()
    print(results)

太好了。我看到你在log_result()函数内部使用了函数外的变量。我能不能做类似于callback=lambda x: log_result(x, results)这样的操作来避免这种情况呢? - FooBar
是的,你可以这样做,但 lambda 函数也将访问其作用域外的变量。由于回调函数 必须 接受一个且仅一个参数,即 foo 的返回值,因此无法将 results(和 data)作为本地变量提供给回调函数。但是,您可以使用闭包将 resultslen_data 变量设置为父函数的非局部作用域中的变量,而不是全局变量。我已编辑上面的帖子以显示我的意思。 - unutbu
哇,那是一个非常聪明的结构。 - FooBar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接