多进程池是否会给每个进程相同数量的任务,还是根据可用性分配任务?

23

当您将一个可迭代对象mapmultiprocessing.Pool时,迭代是否在开始时分成每个进程的队列,还是有一个公共队列,当进程空闲时从中取出任务?

    def generate_stuff():
        for foo in range(100):
             yield foo

    def process(moo):
        print moo

    pool = multiprocessing.Pool()
    pool.map(func=process, iterable=generate_stuff())
    pool.close()

所以,考虑到这个未经测试的建议代码;如果池中有4个进程,每个进程是否都分配了25件工作,还是100件工作逐一被寻找工作的进程挑选,因此每个进程可能做不同数量的工作,例如30、26、24、20件。


哦,这很相关和适用,因为我不确定map的默认chunksize是多少 - 没有指定默认值支持了下面评论中的怀疑 - 它在开始时将整个块平均分配给每个进程。 - John Mee
3
如我在回答中所提到的,您可以阅读源代码。map 函数接受参数 chunksize=None。然后,在 map_async 函数中(它使用了 map 函数),如果 chunksize 的值为 None,则会设置 chunksize, extra = divmod(len(iterable), len(self.pool) * 4)(然后,如果 extra 不等于0,则会将 chunksize 加1)。因此,如果您有一个包含8个工作进程和100个任务的线程池,那么 chunksize 将为4。 - abarnert
1
厉害!此外还解释了为什么map在开始时要遍历整个可迭代对象——它在查找长度。如果我要使用yield,那么我应该使用imap。谢谢大家! - John Mee
就像我下面所说的,这是一个权衡。map会遍历整个可迭代对象,这意味着在开始之前会有一些延迟和/或内存运行(对于100个整数来说不是什么大问题,但对于比如1000个网络爬虫结果来说可能是不可接受的,更不用说itertools.repeat了...)。但它更简单,并且您可以获得默认的chunksize而不必计算/测量/猜测。 - abarnert
还解释了为什么在运行24小时后,我的具有8个进程的10,000+长队列需要永远才能完成:每个进程都在缓慢地一个接一个地死亡——块大小超过300。每个任务需要30-60秒,难怪自第一个进程终止以来已经过去了3个小时;现在只剩下一个进程要完成了。活着并学习吧。 - John Mee
显示剩余2条评论
3个回答

27
所以,假设有4个进程在池中,根据这个未经测试的建议代码,每个进程会被分配25个工作任务,还是100个工作任务会被逐个地挑选给寻找任务的进程,使得每个进程可能完成不同数量的工作任务,例如30,26,24,20等。
嗯,显而易见的答案就是进行测试。
目前的测试可能无法告诉你太多信息,因为作业将尽快完成,即使池中的进程随着就绪状态获取工作任务,事情最终也可能平均分配。但是有一种简单的方法可以解决这个问题:
import collections
import multiprocessing
import os
import random
import time

def generate_stuff():
    for foo in range(100):
        yield foo

def process(moo):
    #print moo
    time.sleep(random.randint(0, 50) / 10.)
    return os.getpid()

pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)

如果数字“不均匀”,那么你知道集合过程必须抓取新的就绪作业。(我明确地将chunksize设置为1,以确保每个进程在第一次分配任务时不会只得到一个大块。)

当我在一台8核机器上运行它时:

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})

看起来进程正在动态获取新工作。

由于您特别询问了4个工作者,我将Pool()更改为Pool(4),然后得到了这个结果:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})

不过,有一种比测试更好的方法可以找到答案:阅读源代码

正如您所看到的,map只是调用了map_async,后者创建了一堆批次并将它们放在一个self._taskqueue对象(一个Queue.Queue实例)中。如果您继续阅读,您会发现这个队列并没有与其他进程直接共享,但是有一个池管理器线程,每当一个进程完成并返回结果时,它就会将下一个作业从队列中弹出并重新提交给该进程。

这也是您如何找到map的默认块大小的方法。上面链接的2.7实现显示,它只是len(iterable) / (len(self._pool) * 4)向上取整得到的值(比这稍微冗长一些,以避免小数算术)-或者换句话说,对于每个进程大约有4个块足够了。但您真的不应该依赖这个;文档含糊地、间接地暗示它将使用某种启发式方法,但并不保证它会是什么。因此,如果您确实需要"每个进程大约有4个块",请明确计算它。更现实的是,如果您需要除了默认值以外的任何东西,您可能需要一个特定于领域的值,需要通过计算、猜测或分析来确定。


谢谢,伙计。关于测试,我不确定如何获取计数。我在考虑是否需要想办法共享变量之类的东西。统计进程 ID 的过程很有见地。你需要在关闭后加上 pool.join() 来确保所有工作都完成了,然后再输出计数吗? - John Mee
1
请记住,map为每个作业返回一个值,并将它们连接成列表(而map_asyncimapimap_unordered以不同的方式提供相同的信息),因此您很少需要进行任何进程间共享,只需跨进程获取此类信息即可。 - abarnert
关于 join,在这种情况下你不需要它:map 会一直阻塞直到所有100个结果返回,而且没有其他代码提交作业。但是,如果你想尝试其他的作业分配方法,可能需要使用它。 - abarnert

3

http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map

map(func, iterable[, chunksize])

该方法将可迭代对象切片成多个块,并将其作为单独的任务提交到进程池。可以通过将chunksize设置为正整数来指定这些块的(大约)大小。

我认为,处理完上一个块后,进程会从队列中取出下一个块。

默认的chunksize取决于iterable的长度,并且会选择使得块的数量大约是进程数量的四倍的值。(参考资料)


我注意到imap的默认块大小被指定为1,我想知道map的默认值是多少?根据我的应用程序当前所做的事情,我怀疑它会在开始时将地图分成相等的块;但不确定-因此有这个问题。 - John Mee
1
@JohnMee:imap默认为1的原因是imap不知道iterable的长度,因此它无法启发式地猜测最佳的chunksize。(是的,这意味着存在一种权衡——有时构建一个list以获取该启发式方法实际上更快。但通常,您可以通过了解问题空间来想出更好的chunksize。) - abarnert

1

如果想要估算Python实现中使用的chunksize,而不需要查看其multiprocessing模块源代码,请运行以下命令:

#!/usr/bin/env python
import multiprocessing as mp
from itertools import groupby

def work(index):
    mp.get_logger().info(index)
    return index, mp.current_process().name

if __name__ == "__main__":
    import logging
    import sys
    logger = mp.log_to_stderr()

    # process cmdline args
    try:
        sys.argv.remove('--verbose')
    except ValueError:
        pass  # not verbose
    else:
        logger.setLevel(logging.INFO)  # verbose
    nprocesses, nitems = int(sys.argv.pop(1)), int(sys.argv.pop(1))
    # choices: 'map', 'imap', 'imap_unordered'
    map_name = sys.argv[1] if len(sys.argv) > 1 else 'map'
    kwargs = dict(chunksize=int(sys.argv[2])) if len(sys.argv) > 2 else {}

    # estimate chunksize used
    max_chunksize = 0
    map_func = getattr(mp.Pool(nprocesses), map_name)
    for _, group in groupby(sorted(map_func(work, range(nitems), **kwargs),
                                   key=lambda x: x[0]),  # sort by index
                            key=lambda x: x[1]):  # group by process name
        max_chunksize = max(max_chunksize, len(list(group)))
    print("%s: max_chunksize %d" % (map_name, max_chunksize))

这表明imapimap_unordered默认使用chunksize=1,而mapmax_chunksize取决于nprocessesnitem(每个进程的块数不固定)和Python版本,如果指定了chunksize参数,则所有*map*函数都会考虑该参数。

用法

$ ./estimate_chunksize.py nprocesses nitems [map_name [chunksize]] [--verbose]

要查看各个作业的分布情况,请指定--verbose参数。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接