Python多进程池/星形映射的行为

4

我正在使用multiprocessing库编写一个程序来计算一些东西。需要计算大约10K个输入,每个输入的计算时间介于0.2秒到10秒之间。

我目前使用的方法是使用一个进程池:

# Inputs
signals = [list(s) for s in itertools.combinations_with_replacement(possible_inputs, 3)]

# Compute
with mp.Pool(processes = N) as p:
    p.starmap(compute_solutions, [(s, t0, tf, folder) for s in signals])
    print ("    | Computation done.")

我注意到在检查最后300/400个输入时,程序变得非常缓慢。我的问题是: Poolstarmap() 如何运作?

根据我的观察,如果我有10K个输入和N = 4(4个进程),那么前2,500个输入分配给第一个进程,接下来的2,500个分配给第二个进程,...每个进程按顺序处理其输入。

这意味着,如果有些进程在其他进程之前清除了队列,它们就不会获得要执行的新任务。

这正确吗?

如果是这样,我怎样才能拥有一个更智能的系统,可以用这个伪代码表示:

workers = Initialize N workers
tasks = A list of the tasks to perform

for task in tasks:
    if a worker is free:
        submit task to this worker
    else:
        wait

感谢您的帮助 :)

N.B:不同的map函数有什么区别。我相信存在map()imap_unordered()imapstarmap

它们之间有什么区别?我们应该在什么情况下使用哪一个?

1个回答

6
这意味着如果一些进程在其他进程之前清除了队列,则它们不会获得新的任务执行。
这正确吗?
不正确。 multiprocess.Pool() 的主要目的是将传递的工作负载分配给其工作进程池 - 这就是为什么它带有所有这些映射选项的原因 - 其各种方法之间的唯一区别在于工作负载实际上如何分布以及如何收集返回结果。 在您的情况下,使用[(s, t0, tf, folder) for s in signals] 生成的可迭代对象将被发送到池中的下一个空闲工作进程(称为compute_solutions(s, t0, tf, folder)),依次一次发送一个(或更多,如果传递了chunksize参数),直到整个可迭代对象用完为止。 您不能控制哪个工作进程执行哪个部分。 工作负载也可能不均匀分布 - 一个工作进程可能会处理比另一个工作进程更多的条目,具体取决于资源使用情况,执行速度,各种内部事件...
然而,使用multiprocessing.Poolmapimapstarmap方法,你会得到均匀有序的错觉,因为它们在内部同步每个工作进程的返回值以匹配可迭代对象(即结果的第一个元素将包含调用函数与可迭代对象的第一个元素返回的结果)。如果想看看实际发生的情况,可以尝试这些方法的异步/无序版本。

因此,默认情况下,你会得到更智能的系统,但是如果你想完全控制你的工作进程池,始终可以使用multiprocessing.Pool.apply_async()

另外,如果你想优化对可迭代对象本身的访问(因为池映射选项将消耗其大部分),可以查看this answer

最后,

它们之间有什么区别?在什么情况下应该使用其中之一?

与其让我在这里引用,不如前往官方文档,因为那里有一个相当好的解释,说明了它们之间的区别。


谢谢,我明天会更详细地查看它。 - Mathieu
1
补充一下这个答案,我不确定它是否正确。但是我观察到,如果我使用上面的代码,并且使用2个工作进程池,如果工人1接收到的工作比工人2接收到的工作短得多,那么工人1将比工人2早完成很长时间,因此即使尚未由工人2启动的工作可以转移到工人1。在我的情况下,每个工作都会生成一个文件。有时,我必须等待10/12个小时以上才能完成最后的工作(而一个工作不会超过15/20分钟)。我通过maxtaskperchild参数解决了这个问题。 - Mathieu
1
@Mathieu,你说得对,它预先为每个处理器分配了子任务集。我已经以十几种不同的方式检查过了这一点。即使其他处理器分配的所有任务都已经完成,需要很长时间才能完成的单个任务也会延迟排在它后面的其他任务。maxtasksperchild=1似乎可以解决问题。这将为每个任务产生一个新的进程,而不是预先分配任务并在整个时间段内保留进程。 - Tom
没错,但是生成一个新进程并不是一项廉价的操作,因此将该参数适应于您的任务同样很重要。 - Mathieu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接