Python itertools 与多进程 - 处理大列表 vs 使用迭代器的低效CPU利用率

9

我正在处理n个元素(以下称为“pair”)的重复变化,用作函数参数。显然,只要“r”列表不足以消耗所有内存,一切都正常运行。问题在于最终我必须对6个元素进行超过16次重复。我在云中使用40核系统来完成这项工作。

代码如下所示:

if __name__ == '__main__':
  pool = Pool(39)
  r = itertools.product(pairs,repeat=16)
  pool.map(f, r)

我觉得应该使用迭代器而不是一开始就创建一个巨大的列表,这里问题开始了...
我尝试使用以下代码解决这个问题:
if __name__ == '__main__':
  pool = Pool(39)
  for r in itertools.product(pairs,repeat=14):
    pool.map(f, r)

内存问题已经解决,但是每个核心的CPU使用率只有5%。现在单核版本的代码比这个更快。
如果您能给我一些指导,我将不胜感激。
谢谢。

附注:如果您使用的是现代Python(Python 3.3或更高版本),最好使用带有“with”语句的Pool,以便Pool工作进程可以可预测地清理。只需将pool = Pool(39)更改为with Pool(39) as pool:并缩进使用池的下面的行;当退出块时,工作进程会立即清理。 - ShadowRanger
2个回答

7

你原来的代码没有在自己的代码中创建一个 listitertools.product 返回一个生成器),但是 pool.map 实现了整个生成器(因为它假设如果你可以存储所有输出,你也可以存储所有输入)。

不要在这里使用 pool.map。如果你需要有序的结果,请使用 pool.imap,或者如果结果顺序不重要,请使用 pool.imap_unordered。迭代任一调用的结果(不要包装在 list 中),并在结果到来时处理它们,内存就不会成为问题:

if __name__ == '__main__':
    pool = Pool(39)
    for result in pool.imap(f, itertools.product(pairs, repeat=16)):
        print(result)

如果您使用pool.map来实现副作用,所以您只需要将其运行到完成,但结果和排序并不重要,那么您可以通过使用imap_unordered并使用collections.deque来有效地排除“结果”而不必真正存储任何东西(一个maxlen0deque是强制迭代器运行至完成而不存储结果的最快、最低内存方式)以大幅提高性能。
from collections import deque

if __name__ == '__main__':
    pool = Pool(39)
    deque(pool.imap_unordered(f, itertools.product(pairs, repeat=16)), 0)

最后,我有点怀疑指定39个Pool工作进程;如果您使用的工作进程比CPU核心更多且获得了好处,则可能multiprocessing在IPC方面的成本超过了其获得的收益,并且使用更多的工作进程只是通过缓冲更多数据来掩盖问题。
如果您的工作主要是I/O绑定的,则可以尝试使用基于线程的池,这将避免pickling和unpickling的开销,以及父进程和子进程之间IPC的成本。与基于进程的池不同,Python线程受到GIL问题的影响,因此在Python中,CPU绑定的工作(不包括用于I/O的GIL释放调用,ctypes调用.dll/.so文件以及某些第三方扩展,如numpy,用于重型CPU工作的GIL释放)仅限于单个核心(对于CPU绑定的工作,在Python 2.x中,您通常会浪费相当一部分时间来解决GIL争用和执行上下文切换;Python 3消除了大部分浪费)。但是,如果您的工作主要是I/O绑定的,则阻塞I/O会释放GIL以允许其他线程运行,因此您可以拥有许多线程,只要大多数线程延迟在I/O上。很容易进行切换(只要您没有设计程序依赖于每个工作进程的单独地址空间,假设您可以写入“共享”状态而不影响其他工作进程或父进程),只需更改:
from multiprocessing import Pool

to:

from multiprocessing.dummy import Pool

你可以获取multiprocessing.dummy版本的池,该池基于线程而不是进程。


感谢您的澄清。我已经尝试了两种选项,但是对于这两种选项,第一个进程在 top 中显示了150%的CPU利用率,而其余进程仅忙于40%,一旦进程数量增加,它就会急剧下降(对于40个vcpus和39个进程,最高只有17%)。如何使其更有效率? - xis_one
@xis_one:有一个可以帮助的方法是将>1的“chunksize”传递给“imap”/“imap_unordered”,这样在工作进程再次阻塞在IPC之前,可以完成更多的工作。更复杂但通常更好的选择是使工作进程生成一些自己的工作,例如,如果“pairs”是全局变量,则可以为“product(pairs, repeat=10)”分配工作,然后让每个工作进程生成最后6个可能的所有项,例如,“for workitem in map(workerarg.add, product(pairs, repeat=6)):”,从而减少必须传输的数据量以执行单个任务。 - ShadowRanger
注意:我上一条评论中的“map”是普通内置的“map”,而不是池映射。如果您使用的是Python 2,您需要执行“from future_builtins import map”以获取Py3基于生成器的“map”,以避免巨大的“list”问题。 - ShadowRanger
附加说明:如果将一些工作生成推给子进程,它们将返回一个值集合而不是单个值。在这种情况下,为了使其仍然像一次获取单个值一样运行,您可能需要查看将imap*调用包装在itertools.chain.from_iterable中,以便将其从list/tuple的迭代器转换为基础值的迭代器。 - ShadowRanger

0
第二个代码示例较慢,因为您正在向39个工作进程池提交单个对。只有一个工作进程将处理您的请求,其他38个将无事可做!由于需要将数据从主线程传输到工作进程中,因此速度会变慢。
您可以“缓冲”一些对,然后执行这组对以平衡内存使用情况,但仍然可以利用多进程环境的优势。
import itertools
from multiprocessing import Pool

def foo(x):
    return sum(x)

cpus = 3
pool = Pool(cpus)
# 10 is buffer size multiplier - the number of pair that each process will get
buff_size = 10*cpus  
buff = []
for i, r in enumerate(itertools.product(range(20), range(10))):
    if (i % buff_size) == (buff_size-1):
        print pool.map(foo, buff)
        buff = []
    else:
        buff.append(r)

if len(buff) > 0:
    print pool.map(foo, buff)
    buff = []

以上代码的输出结果将会是这样的:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 5, 6, 7, 8, 9, 10, 11, 12, 13]
[6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 8, 9, 10, 11, 12, 13, 14, 15, 16]
[9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 14, 15, 16, 17, 18, 19, 20, 21, 22]
[15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 17, 18, 19, 20, 21, 22, 23, 24, 25]
[18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]

调整 buff_size 的乘数以获得适合您系统的正确平衡!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接