当在 multiprocessing 中与 Pool 结合使用时,map 如何划分数据?

3

我有一个要在大量数据上并行计算的函数 f。这些数据可以通过多种方式进行划分,我正在尝试决定如何划分它们。我希望了解 multiprocessing.Pool 中的“map”究竟是如何分配/划分数据的,以便我能正确地决定如何拆分我的数据以及选择处理器的数量。我的输入数据不只是简单的列表,而是包含字典和列表的列表,因此理解 Pool.map 如何划分数据似乎至关重要。

话虽如此,我认为了解简单的示例就足以说明更复杂的情况。

以下代码显示我们选择了一个由 5 个进程组成的 Pool,并且数据是 [1,2,3]。这里对于如何划分数据做了什么隐含的选择?

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))
3个回答

8

这段内容没有得到官方的记录,因此您不能依赖于任何特定的行为。您可以通过传递可选参数chunksize=来强制执行。如果您不这样做,程序会使用一种算法为您生成一个chunksize值。这个值在你源代码树中的Lib/multiprocessing/Pool.py的私有函数_map_async()中可以找到。

def _map_async(self, func, iterable, mapper, chunksize=None, ...
    '''
    Helper function to implement map, starmap and their async counterparts.
    '''
    ...
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0
    ...

len(self._pool) 是工作进程的数量。因此,默认情况下,如果工作项少于4倍进程数,则会逐一传递它们。这是您特定示例中的情况 (3 <= 4*5)。如果工作项比进程多得多,则会选择块大小,以便每个进程在map() 的生命周期内大约处理4次工作量。例如,如果列表中有500个项目,则 500 / (5*4) == 25,因此每次会将25个项目传递给一个工作进程。

为什么不一次性传递100个项目,让5个工作进程都被调用呢?因为这是一种启发式方法。传递少于100个项目是一种权衡,平衡了需要进行多少次进程间通信与负载平衡之间的关系(不同的工作可能需要完成不同的时间),但负载平衡的所有内容都无法预先知道,因此该方法更多地考虑降低进程间调用的次数(但并不绝对!)。

这就是为什么没有对其进行文档记录的原因。未来可能会使用更智能的启发式方法。


2
您可以在此处查看 multiprocessing.Pool.map 处理进程间劳动分工的方式:这里
简而言之,它将把给定的可迭代对象划分为大小为可迭代对象大小除以工作进程数乘以 4 的块。
在您的具体示例中:
In [1]: chunksize, extra = divmod(len([1,2,3]), 5 * 4)
In [2]: if extra:
   ...:     chunksize += 1
   ...:     
In [3]: chunksize
Out[3]: 1

它将生成三个大小为1的块。
您可以通过“chunksize”参数自行控制块的大小。

1
你误读了代码:divmod()返回的chunksize是0,而extra是3。然后由于extra不为0,所以chunksize增加了1。 - Tim Peters

0

我的天真理解是,池子只是按顺序处理输入列表,将前面的“n”个元素发送到池子中,然后在第一个进程再次可用之后,该进程获取下一个元素,直到没有更多元素。最后等待所有元素完成后返回。

你应该使用列表[2,2,2,5,2,2,2]和一个函数进行实验:

def f(x):
    sleep(x)
    return x * x

你是指定特定的“n”吗?(进程数量)? - Steve
是的,Pool的参数 - quamrana

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接