多进程池:map_async和imap有什么区别?

266

我正在尝试学习如何使用Python的multiprocessing包,但我不理解map_asyncimap之间的区别。

我注意到map_asyncimap都是异步执行的。那么我应该在什么情况下使用其中一个?以及我应该如何检索由map_async返回的结果?

我应该像这样使用吗?

def test():
    result = pool.map_async()
    pool.close()
    pool.join()
    return result.get()

result=test()
for i in result:
    print i
2个回答

703

imap/imap_unorderedmap/map_async 有两个主要的区别:

  1. 它们消耗你传递给它们的可迭代对象的方式。
  2. 它们将结果返回给你的方式。

map 通过将可迭代对象转换为列表(假设它不是列表),将其拆分成块,并将这些块发送到 Pool 中的工作进程中来消耗你的可迭代对象。将可迭代对象拆分成块比以一项一次地在进程之间传递每个项目的方式更加高效,特别是当可迭代对象很大时。然而,为了对其进行分块,将可迭代对象转换为列表可能会产生非常高的内存成本,因为整个列表都需要保留在内存中。

imap 不会将你提供的可迭代对象转换为列表,也不会将其拆分成块(默认情况下)。它将一个元素迭代器一次性地迭代过去,并将它们发送到工作进程中的每个元素。这意味着你不会承受将整个可迭代对象转换为列表所带来的内存负荷,但也意味着对于大型可迭代对象,性能会更慢,因为缺少分块。然而,这可以通过传递一个大于默认值 1 的 chunksize 参数来缓解。

imap/imap_unorderedmap/map_async 的另一个主要区别是,在使用 imap/imap_unordered 时,你可以在工作进程准备好它们的结果时立即开始接收结果,而不必等待它们全部完成。使用 map_async 时,会立即返回一个 AsyncResult 对象,但实际上只有在所有项目都被处理之后,才能从该对象中检索结果,并且此时它将返回与 map 相同的列表(实际上,map 在内部实现为 map_async(...).get())。没有一种方法可以获取部分结果;你要么拥有整个结果,要么什么都没有。

imapimap_unordered 都立即返回可迭代对象。使用 imap 时,结果将按照输入可迭代对象的顺序依次产生。使用 imap_unordered 时,结果将在准备就绪时立即产生,而不考虑输入可迭代对象的顺序。因此,假设你有这样一个可迭代对象:

import multiprocessing
import time

def func(x):
    time.sleep(x)
    return x + 2

if __name__ == "__main__":    
    p = multiprocessing.Pool()
    start = time.time()
    for x in p.imap(func, [1,5,3]):
        print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))

这会输出:

3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

如果你使用p.imap_unordered而不是p.imap,你会看到:

3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)

如果您使用p.mapp.map_async().get(),您将看到:

3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

因此,使用imap/imap_unordered而不是map_async的主要原因是:

  1. 如果将可迭代对象转换为列表会耗尽/使用过多内存,则您的可迭代对象足够大。
  2. 您希望能够在完成所有结果之前开始处理结果。

1
apply和apply_async的使用方法如何? - Harsh Daftary
18
apply会将一个任务发送给工作进程,并一直阻塞直到完成。apply_async会将一个任务发送给工作进程,然后立即返回一个AsyncResult对象,可以使用该对象等待任务完成并获取结果。apply的实现方式是通过简单调用apply_async(...).get()来实现的。 - dano
117
这是一种应该出现在官方“池”文档中的描述,而不是现有的乏味描述。原文链接为 https://docs.python.org/3.6/library/multiprocessing.html#multiprocessing.pool.Pool。 - mins
2
@BallpointBen 当前任务完成后,它将立即转移到下一个工作。排序由父进程处理。 - dano
3
如果你完全不关心返回结果,而是将处理结果写入磁盘以便以后使用,会发生什么? - Tanner
显示剩余3条评论

16
接受的答案指出,对于imap_unordered,“结果将在准备好时立即产生”,这可能会让人推断结果将按完成顺序返回。但我想明确指出,这通常不是正确的。文档说明了结果以任意顺序返回。考虑以下程序,使用池大小为4,可迭代大小为20和块大小值为5。工作函数根据其传递的参数睡眠不同的时间量,这也确保池中的任何一个进程都不会抢占所有提交的任务。因此,我期望池中的每个进程都有5项任务要处理:
from multiprocessing import Pool
import time

def worker(x):
    print(f'x = {x}', flush=True)
    time.sleep(.1 * (20 - x))
    # return approximate completion time with passed argument:
    return time.time(), x

if __name__ == '__main__':
    pool = Pool(4)
    results = pool.imap_unordered(worker, range(20), chunksize=5)
    for t, x in results:
        print('result:', t, x)

输出:

x = 0
x = 5
x = 10
x = 15
x = 16
x = 17
x = 11
x = 18
x = 19
x = 6
result: 1621512513.7737606 15
result: 1621512514.1747007 16
result: 1621512514.4758775 17
result: 1621512514.675989 18
result: 1621512514.7766125 19
x = 12
x = 1
x = 13
x = 7
x = 14
x = 2
result: 1621512514.2716103 10
result: 1621512515.1721854 11
result: 1621512515.9727488 12
result: 1621512516.6744206 13
result: 1621512517.276999 14
x = 8
x = 9
x = 3
result: 1621512514.7695887 5
result: 1621512516.170747 6
result: 1621512517.4713914 7
result: 1621512518.6734042 8
result: 1621512519.7743165 9
x = 4
result: 1621512515.268784 0
result: 1621512517.1698637 1
result: 1621512518.9698756 2
result: 1621512520.671273 3
result: 1621512522.2716706 4

您可以清楚地看到,这些结果并没有按照完成顺序产生。例如,我先收到了工作函数早于4秒返回的1621512515.268784 0,然后才是1621512519.7743165 9。但是,如果我将chunksize值更改为1,则输出变为:
x = 0
x = 1
x = 2
x = 3
x = 4
result: 1621513028.888357 3
x = 5
result: 1621513028.9863524 2
x = 6
result: 1621513029.0838938 1
x = 7
result: 1621513029.1825204 0
x = 8
result: 1621513030.4842813 7
x = 9
result: 1621513030.4852195 6
x = 10
result: 1621513030.4872172 5
x = 11
result: 1621513030.4892178 4
x = 12
result: 1621513031.3908074 11
x = 13
result: 1621513031.4895358 10
x = 14
result: 1621513031.587289 9
x = 15
result: 1621513031.686152 8
x = 16
result: 1621513032.1877549 15
x = 17
result: 1621513032.1896958 14
x = 18
result: 1621513032.1923752 13
x = 19
result: 1621513032.1923752 12
result: 1621513032.2935638 19
result: 1621513032.3927407 18
result: 1621513032.4912949 17
result: 1621513032.5884912 16

这是按完成顺序排列的。然而,如果指定了chunksize值为1,我不确定是否可以断言imap_unordered会始终在结果可用时返回结果,尽管根据此实验情况似乎如此,因为文档没有这样的声明。

讨论

当指定chunksize为5时,20个任务被放置在单个输入队列上,供池中的4个进程以大小为5的块处理。因此,变得空闲的进程将取下下一个包含5个任务的块并逐个处理它们,然后再次变得空闲。因此,第一个进程将处理x参数0到4,第二个进程将处理x参数5到9,依此类推。这就是为什么您看到最初的x值打印为0、5、10和15。

但是,虽然x参数0的结果在x参数9的结果之前完成,但似乎结果会一起写成块,因此x参数0的结果将不会在与其在同一块中排队的x参数(即1、2、3和4)的结果可用之前返回。


4
谢谢,这是一个很好的观点。我同意你的观察结果,即似乎只有当包含该值的整个块完成时,赋予给定结果值的信息才会被传递给父级。 - dano

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接