Python concurrent.futures.ProcessPoolExecutor:.submit()和.map()的性能比较

35

我正在使用concurrent.futures.ProcessPoolExecutor来在一个数字范围内查找一个数字的出现次数。我的目的是探究并发性能带来的加速效果。为了测试性能,我有一个控制组——一个串行代码来执行这个任务(如下所示)。我编写了两个并发代码,一个是使用concurrent.futures.ProcessPoolExecutor.submit(),另一个是使用concurrent.futures.ProcessPoolExecutor.map()来执行同样的任务。它们如下所示。有关起草前者和后者的建议可以在这里这里看到。

所有三个代码的任务都是在数字范围0到1E8中查找数字5出现的次数。同时给.submit().map()分配了6个工作进程,并且.map()的块大小为10,000。在并发代码中离散工作量的方式是相同的。但是,用于在两个代码中查找出现次数的函数是不同的。这是因为调用由.submit().map()调用的函数时传递参数的方式不同。

所有3个代码报告的出现次数相同,即56,953,279次。但是完成任务所需的时间非常不同。相对于控制组,.submit()的执行速度快两倍,而.map()的执行速度则慢两倍。

问题:

  1. 我想知道.map()的低效表现是我的代码的产物还是本质上就很慢?如果是前者,我该如何改进它?我只是惊讶它的执行速度比控制组慢这么多,因为没有多大的动力使用它。
  2. 我想知道是否有办法使.submit()代码执行得更快。我有一个条件,即函数_concurrent_submit()必须返回带有数字/包含数字5的出现次数的可迭代对象。

基准测试结果
benchmark results

concurrent.futures.ProcessPoolExecutor.submit()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from traceback import print_exc

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_submit(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
        # 2.2. Instruct workers to process results as they come, when all are
        #      completed or .....
        cf.as_completed(futures) # faster than cf.wait()
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future.result():
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent_submit():')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_submit(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

concurrent.futures.ProcessPoolExecutor.map()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc

def _findmatch(listnumber, number):    
    '''Function to find the occurrence of number in another number and return
       a string value.'''
    #print('def _findmatch(listnumber, number):')
    #print('listnumber = {0} and ref = {1}'.format(listnumber, number))
    if number in str(listnumber):
        x = listnumber
        #print('x = {0}'.format(x))
        return x 

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            numberlist = range(cstart, cstop)
            futures.append(executor.map(_findmatch, numberlist,
                                        itertools.repeat(number),
                                        chunksize=10000))
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

序列代码:

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

from time import time

def _serial(nmax, number):    
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end=time()-start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.

    start = time()
    a = _serial(nmax, number)
    end = time() - start
    print('\n main')
    print("found {0} in {1:.4f}sec".format(len(a),end))

2017年2月13日更新:

除了@niemmi的答案外,我还经过一些个人研究提供了一个答案,以展示:

  1. 如何进一步加速@niemmi的.map().submit()解决方案
  2. ProcessPoolExecutor.map()何时比ProcessPoolExecutor.submit()更能带来速度提升。
2个回答

26

概述:

我的回答有两个部分:

  • 第一部分展示了如何从@niemmi的ProcessPoolExecutor.map()解决方案中获得更多加速。
  • 第二部分展示了当ProcessPoolExecutor的子类.submit().map()产生不同的计算时间时。

=======================================================================

第一部分:ProcessPoolExecutor.map()更快的速度增长

背景: 这一部分建立在 @niemmi 的.map()解决方案之上,本身就很出色。在研究他的离散化方案以更好地理解它与.map()块大小参数的交互时,我发现了这个有趣的解决方案。

我认为@niemmi对chunk = nmax // workers的定义是chunksize的定义,即每个工作池中的工作者要处理的实际数字范围(给定任务)的较小大小。现在,这个定义的前提是,如果一台计算机有x个工作者,将任务平均分配给每个工作者将导致每个工作者的最佳利用,因此总任务将最快完成。因此,将给定任务分解成的块数应始终等于池工作者数。但是,这个假设正确吗?

提议:在这里,我提出以上假设并不总是能够在ProcessPoolExecutor.map()中导致最快的计算时间。相反,将一个任务离散化到大于池工作者数的数量可以加速,即更快地完成给定任务

实验:我修改了@niemmi的代码,以允许离散化任务的数量超过池工作者的数量。下面给出了此代码,并用于找到数字0到1E8范围内数字5出现的次数。我使用了1、2、4和6个池工作者执行了这段代码,并尝试了不同比例的离散化任务数与池工作者数。对于每种情况,进行了3次运行,并记录了计算时间。在这里,“加速”的定义是使用相等的块数和池工作者的平均计算时间减去具有离散化任务数大于池工作者数的平均计算时间。

结果:

nchunk over nworkers

  1. 左侧的图显示了实验部分提到的所有场景所需的计算时间。它表明,在“块数/工作线程数=1”时所需的计算时间始终大于“块数>工作线程数”的计算时间。也就是说,前者始终比后者效率低。

    右侧的图表明,在块数/工作线程数达到14或更高的阈值时,可以获得1.2倍或更高的加速。有趣的是,当使用1个工作线程执行ProcessPoolExecutor.map()时,也出现了加速趋势。

    结论:在自定义ProcessPoolExecutor.map()应用于解决给定任务时应使用的离散任务数时,务必确保此数字大于池工作者数量,因为这一做法可以缩短计算时间。

    concurrent.futures.ProcessPoolExecutor.map()代码(仅修订部分)

    def _concurrent_map(nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
           find the occurrences of a given number in a number range in a parallelised
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        futures = []
        found =[]
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            cstart = (chunksize * i for i in range(num_of_chunks))
            cstop = (chunksize * i if i != num_of_chunks else nmax
                     for i in range(1, num_of_chunks + 1))
            futures = executor.map(_findmatch, cstart, cstop,
                                   itertools.repeat(number))
            # 2.2. Consolidate result as a list and return this list.
            for future in futures:
                #print('type(future)=',type(future))
                for f in future:
                    if f:
                        try:
                            found.append(f)
                        except:
                            print_exc()
            foundsize = len(found)
            end = time() - start
            print('\n within statement of def _concurrent(nmax, number):')
            print("found {0} in {1:.4f}sec".format(foundsize, end))
        return found
    
    if __name__ == '__main__':
        nmax = int(1E8) # Number range maximum.
        number = str(5) # Number to be found in number range.
        workers = 4     # Pool of workers
        chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance  
        num_of_chunks = chunks_vs_workers * workers
    
        start = time()
        a = _concurrent_map(nmax, number, workers, num_of_chunks)
        end = time() - start
        print('\n main')
        print('nmax={}, workers={}, num_of_chunks={}'.format(
              nmax, workers, num_of_chunks))
        print('workers = ', workers)
        print("found {0} in {1:.4f}sec".format(len(a),end))
    

    第二部分:使用ProcessPoolExecutor子类的.submit().map()方法返回排序/有序结果列表时,总计算时间可能不同。

    背景:我已经修改了.submit().map()代码,以允许进行"苹果对苹果"的比较,比较它们的计算时间,并能够可视化主代码、执行并发操作的_concurrent方法的计算时间以及由_concurrent方法调用的每个离散任务/工作的计算时间。此外,在这些代码中,concurrent方法被构造为直接从.submit()的future对象和.map()迭代器返回无序和有序结果列表。提供源代码(希望它能帮到你)。

    实验:这两个新改进的代码被用于执行与第一部分描述相同的实验,只考虑6个池工作进程,并使用Python内置的listsorted方法分别返回无序和有序结果列表到代码的主部分。

    研究发现:.submit vs .map plus list vs sorted

    1. 从_concurrent方法的结果可以看出,用于创建所有ProcessPoolExecutor.submit()的Future对象和用于创建ProcessPoolExecutor.map()迭代器的_concurrent方法的计算时间,作为离散任务数量对池工作进程数量的函数是相等的。这个结果简单地意味着ProcessPoolExecutor子类的.submit().map()是同样高效/快速的。
    2. 将主程序和它的_concurrent方法的计算时间进行比较,我们可以看到主程序运行时间比其_concurrent方法更长。这是预期的,因为它们的时间差反映了listsorted方法(以及这些方法中所包含的其他方法)的计算时间量。可以清楚地看到,list方法返回结果列表所需的计算时间比sorted方法更少。对于.submit()和.map()代码,list方法的平均计算时间大约为0.47秒。对于排序方法,.submit()和.map()代码的平均计算时间分别为1.23秒和1.01秒。换句话说,对于.submit()和.map()代码,list方法的执行速度分别是sorted方法的2.62倍和2.15倍。
    3. 目前尚不清楚为什么sorted方法在.map()中生成有序列表比在.submit()中更快,因为离散任务数量增加的程度超过了工作池进程数量,除非离散任务数量等于池工作进程数量。然而,这些研究结果表明,使用同样快速的.submit().map()子类的决定可能会受到排序方法的影响。例如,如果意图是以最短时间生成有序列表,则应优先使用ProcessPoolExecutor.map()而不是ProcessPoolExecutor.submit(),因为.map()可以允许最短总计算时间。
    4. 本文提出的离散化方案旨在加速.submit().map()子类的性能。与离散任务数量等于池工作进程数量的情况相比,加速的量可以高达20%。

    改进后的.map()代码

    #!/usr/bin/python3.5
    # -*- coding: utf-8 -*-
    
    import concurrent.futures as cf
    from time import time
    from itertools import repeat, chain 
    
    
    def _findmatch(nmin, nmax, number):
        '''Function to find the occurence of number in range nmin to nmax and return
           the found occurences in a list.'''
        start = time()
        match=[]
        for n in range(nmin, nmax):
            if number in str(n):
                match.append(n)
        end = time() - start
        #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
        #      format(nmin, nmax, number, len(match),end))
        return match
    
    def _concurrent(nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
           find the occurrences of a given number in a number range in a concurrent
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            cstart = (chunksize * i for i in range(num_of_chunks))
            cstop = (chunksize * i if i != num_of_chunks else nmax
                     for i in range(1, num_of_chunks + 1))
            futures = executor.map(_findmatch, cstart, cstop, repeat(number))
        end = time() - start
        print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
        print("found in {0:.4f}sec".format(end))
        return list(chain.from_iterable(futures)) #Return an unordered result list
        #return sorted(chain.from_iterable(futures)) #Return an ordered result list
    
    if __name__ == '__main__':
        nmax = int(1E8) # Number range maximum.
        number = str(5) # Number to be found in number range.
        workers = 6     # Pool of workers
        chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
        num_of_chunks = chunks_vs_workers * workers
    
        start = time()
        found = _concurrent(nmax, number, workers, num_of_chunks)
        end = time() - start
        print('\n main')
        print('nmax={}, workers={}, num_of_chunks={}'.format(
              nmax, workers, num_of_chunks))
        #print('found = ', found)
        print("found {0} in {1:.4f}sec".format(len(found),end))    
    

    改进的 .submit() 代码。
    这段代码与 .map 代码相同,只需将 _concurrent 方法替换为以下代码:

    def _concurrent(nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
           find the occurrences of a given number in a number range in a concurrent
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        futures = []
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            for i in range(num_of_chunks):
                cstart = chunksize * i
                cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
                futures.append(executor.submit(_findmatch, cstart, cstop, number))
        end = time() - start
        print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
        print("found in {0:.4f}sec".format(end))
        return list(chain.from_iterable(f.result() for f in cf.as_completed(
            futures))) #Return an unordered list
        #return list(chain.from_iterable(f.result() for f in cf.as_completed(
        #    futures))) #Return an ordered list
    

    =======================================================================


14

在这里你正在比较不同的事物。使用map函数,你将产生全部1E8个数字并将它们传输到工作进程中。与实际执行相比,这需要很长时间。而使用submit函数,你只需创建6组传输的参数。

如果将map函数修改为与同样的原则操作,你将得到非常接近的数字:

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunk * i for i in range(workers))
        cstop = (chunk * i if i != workers else nmax for i in range(1, workers + 1))
        futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number))

        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

使用as_completed可以有效提高 submit 方法的性能。对于给定的 future 迭代器,它将返回一个按完成顺序 yield future 的迭代器。

您还可以跳过将数据复制到另一个数组中,而是使用itertools.chain.from_iterable将来自 futures 的结果组合为单个迭代器:

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc
from itertools import chain

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))

    return chain.from_iterable(f.result() for f in cf.as_completed(futures))

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(sum(1 for x in a),end))

我刚刚学习了你的.map()解决方案。哇...你重新编写cstartcstop以将其应用于_findmatch().map()的方式真是巧妙。我从未想过可以这样做。这是我第一次使用.map()。这就是为什么.map()代码中的_findmatch与.submit()代码和控制代码中的_findmatch不同,导致了苹果与橙子的比较。;) 我试图在.map()中包含chunksize,但发现它会导致性能变慢。chunksize越大,.map代码执行得越慢。你能帮我理解为什么会这样吗? - Sun Bear
@SunBear 如果您使用了我的map版本,那么应该有一个简单的解释。假设您的机器上有2个核心,这意味着如果您正确地并行化工作,它可以在一半的时间内完成。现在,map实现将工作分成6个部分。假设您定义了chunksize=5,其中一个工作人员获得了6个部分中的5个,导致5/6的工作在一个核心上被处理。通常情况下,使用更大的chunksize是有意义的,但前提是它允许工作在工作者之间均匀分配。尝试使用原始的submit降低chunksize,您应该会看到它变慢了。 - niemmi
我理解你的观点,直到“在其中一个核上处理了5/6的工作量”这一点。 当chunksize = 10时会发生什么? 这是否意味着所有6个进程都进入1个工作进程中,而其他工作进程闲置? 那么额外的chunksize又是什么意思呢? 不好意思,我有点慢。 顺便说一下,当我试图弄清楚你的chunksize和.map() chunksize如何一起影响计算速度时,我发现了一些有趣的东西。请参阅我的补充回答。 我认为这种交互作用导致块数/工作器数量<< 1,从而进入图表的左侧,即计算时间更长。 - Sun Bear
我已经比较了.submit()的代码。使用6个工作人员,在5次运行中,您的代码的平均计算时间比我在问题中发布的.submit()代码的平均计算时间快了约1.4倍。您的代码的平均时间是6.41秒。哇...太棒了!与我建议的更改相比,对比.submit()代码和.map().code,.submit()代码仍然更快。 - Sun Bear
@SunBear 注意,在我的解决方案中,从可迭代对象产生的数字是无序的。时间节省来自于不将数字复制到主进程中的列表,并且在消耗后续结果之前不需要等待包含数字5xxxxxxx的块完成。我稍后会根据评论和您的答案尝试扩展我的回答。 - niemmi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接