所以,假设有4个进程在池中,根据这个未经测试的建议代码,每个进程会被分配25个工作任务,还是100个工作任务会被逐个地挑选给寻找任务的进程,使得每个进程可能完成不同数量的工作任务,例如30,26,24,20等。
嗯,显而易见的答案就是进行测试。
目前的测试可能无法告诉你太多信息,因为作业将尽快完成,即使池中的进程随着就绪状态获取工作任务,事情最终也可能平均分配。但是有一种简单的方法可以解决这个问题:
import collections
import multiprocessing
import os
import random
import time
def generate_stuff():
for foo in range(100):
yield foo
def process(moo):
time.sleep(random.randint(0, 50) / 10.)
return os.getpid()
pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)
如果数字“不均匀”,那么你知道集合过程必须抓取新的就绪作业。(我明确地将chunksize
设置为1,以确保每个进程在第一次分配任务时不会只得到一个大块。)
当我在一台8核机器上运行它时:
Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})
看起来进程正在动态获取新工作。
由于您特别询问了4个工作者,我将Pool()
更改为Pool(4)
,然后得到了这个结果:
Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})
不过,有一种比测试更好的方法可以找到答案:阅读源代码。
正如您所看到的,map
只是调用了map_async
,后者创建了一堆批次并将它们放在一个self._taskqueue
对象(一个Queue.Queue
实例)中。如果您继续阅读,您会发现这个队列并没有与其他进程直接共享,但是有一个池管理器线程,每当一个进程完成并返回结果时,它就会将下一个作业从队列中弹出并重新提交给该进程。
这也是您如何找到map
的默认块大小的方法。上面链接的2.7实现显示,它只是len(iterable) / (len(self._pool) * 4)
向上取整得到的值(比这稍微冗长一些,以避免小数算术)-或者换句话说,对于每个进程大约有4个块足够了。但您真的不应该依赖这个;文档含糊地、间接地暗示它将使用某种启发式方法,但并不保证它会是什么。因此,如果您确实需要"每个进程大约有4个块",请明确计算它。更现实的是,如果您需要除了默认值以外的任何东西,您可能需要一个特定于领域的值,需要通过计算、猜测或分析来确定。
map
的默认chunksize
是多少 - 没有指定默认值支持了下面评论中的怀疑 - 它在开始时将整个块平均分配给每个进程。 - John Meemap
函数接受参数chunksize=None
。然后,在map_async
函数中(它使用了map
函数),如果chunksize
的值为None
,则会设置chunksize, extra = divmod(len(iterable), len(self.pool) * 4)
(然后,如果extra
不等于0,则会将chunksize
加1)。因此,如果您有一个包含8个工作进程和100个任务的线程池,那么chunksize
将为4。 - abarnertmap
在开始时要遍历整个可迭代对象——它在查找长度。如果我要使用yield
,那么我应该使用imap
。谢谢大家! - John Meemap
会遍历整个可迭代对象,这意味着在开始之前会有一些延迟和/或内存运行(对于100个整数来说不是什么大问题,但对于比如1000个网络爬虫结果来说可能是不可接受的,更不用说itertools.repeat
了...)。但它更简单,并且您可以获得默认的chunksize
而不必计算/测量/猜测。 - abarnert