如何并行化一个简单的Python循环?

491

这可能是一个简单的问题,但我如何在Python中并行化以下循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

我知道如何在Python中启动单个线程,但我不知道如何“收集”结果。

多个进程也可以 - 对于这种情况,任何最容易的方法都可以。 我目前正在使用Linux,但代码也应该在Windows和Mac上运行。

最简单的并行化此代码的方法是什么?


2
一个非常容易并行化for循环的解决方案还没有被提到 - 这可以通过简单地使用 deco 包来装饰两个函数来实现。 - 7824238
这并不是什么大事,但使用方括号初始化空列表更快且更符合Python风格:mylist = []。时间记录在此处:https://dev59.com/3m025IYBdhLWcg3wzZP2#5790954 - JohnM
15个回答

299
CPython实现目前有一个全局解释器锁(GIL),它阻止同一解释器的线程同时执行Python代码。这意味着CPython线程对于并发的I/O密集型工作负载很有用,但通常不适用于CPU密集型工作负载。命名为calc_stuff()表明您的工作负载是CPU密集型的,因此您希望在这里使用多个进程(这通常是CPU密集型工作负载的更好解决方案,无论是否存在GIL)。
有两种简单的方法可以在Python标准库中创建进程池。第一种是multiprocessing模块,可以像这样使用:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

请注意,由于实现方式的原因,这在交互式解释器中无法正常工作。
创建进程池的第二种方式是使用concurrent.futures.ProcessPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as pool:
    out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

这个使用了底层的multiprocessing模块,所以它的行为和第一个版本完全一样。

146
既然这是被选中的答案,能否提供一个更全面的例子呢? calc_stuff的参数是什么? - Eduardo Pignatelli
7
请阅读multiprocessing模块的文档,以获取更全面的示例。Pool.map()基本上像map()一样工作,但是可以并行处理。 - Sven Marnach
7
有没有一种简单的方法可以在这段代码结构中加入一个tqdm进度条?我已经使用了tqdm(pool.imap(calc_stuff, range(0, 10 * offset, offset))),但是我没有得到完整的进度条图形。 - user8188120
2
要使用tqdm加载条,请参见此问题:https://dev59.com/8lgQ5IYBdhLWcg3w-44d - Johannes
4
为了避免其他人像我一样掉进陷阱,池的实例化和调用pool.map必须在一个函数内部进行:https://dev59.com/2lwY5IYBdhLWcg3wHkqK - kabdulla
显示剩余9条评论

189
from joblib import Parallel, delayed
def process(i):
    return i * i
    
results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results)  # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

在我的机器上(Ubuntu系统,预装有joblib包,但也可以通过pip install joblib命令进行安装),以上代码完美运行。

引用自https://blog.dominodatalab.com/simple-parallelization/


2021年3月31日更新:关于joblibmultiprocessingthreadingasyncio的说明

  • 以上代码中的joblib使用了import multiprocessing(因此使用多个进程,这通常是在多核CPU上运行CPU工作的最佳方法,例如由于全局解释器锁(GIL)的存在而被阻止的线程)。
  • 您可以让joblib使用多个线程而不是多个进程,但这只有在线程花费大量时间进行I/O操作(例如从磁盘读/写数据,发送HTTP请求)时才有效。对于I/O操作,GIL不会阻止另一个线程的执行。
  • 自Python 3.7以来,您可以使用asyncio来并行处理工作,作为threading的替代方案,但是与import threading不同的是,仅使用1个线程;优点在于asyncio具有许多有用的异步编程特性。
  • 使用多个进程会产生开销。想一想:通常,每个进程都需要初始化/加载运行计算所需的所有内容。您需要自己检查以上代码段是否改善了您的墙上时间。下面是另一个示例,我确认joblib能够产生更好的结果:
import time
from joblib import Parallel, delayed

def countdown(n):
    while n>0:
        n -= 1
    return n


t = time.time()
for _ in range(20):
    print(countdown(10**7), end=" ")
print(time.time() - t)  
# takes ~10.5 seconds on medium sized Macbook Pro


t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro

10
我尝试了你的代码,但在我的系统上,这个代码的顺序版本大约需要半分钟,而上述并行版本需要4分钟。为什么会这样呢? - shaifali Gupta
4
谢谢您的回答!我认为这是2019年最优雅的做法。 - Heikki Pulkkinen
2
@tyrex 感谢分享!这个 joblib 包很棒,示例对我很有用。不过,在更复杂的情况下,我遇到了一个错误。https://github.com/joblib/joblib/issues/949 - Open Food Broker
2
@shaifaliGupta 我认为这取决于您的函数 processInput 每个样本需要多长时间。如果每个 i 的时间很短,您将看不到任何改进。我实际上尝试了代码,发现如果函数 processInput 需要很少的时间,则 for 循环实际上执行得更好。然而,如果您的函数 processInput 运行时间很长。使用这种并行方法要优越得多。 - aysljc
2
这个代码是可行的,但是如果你想在Windows上使用它,并且希望通过Jupyter Notebook显示输出,那么你会遇到这里的问题 https://dev59.com/RFMI5IYBdhLWcg3wi8Eb - spizwhiz
显示剩余3条评论

125

这是最简单的方法!

您可以使用asyncio。(文档可在此处找到)。它被用作多个Python异步框架的基础,提供高性能网络和Web服务器、数据库连接库、分布式任务队列等。此外,它还有高级和低级API,以适应任何类型的问题。

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

每次调用该函数时,都会以并行方式运行,而不会使主程序进入等待状态。您也可以将其用于并行化循环。当为for循环调用时,尽管循环是顺序的,但每次迭代都会与主程序并行运行,只要解释器到达那里。

1. 在不等待任何内容的情况下并行执行循环

输入图像描述

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

这会产生以下输出:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

更新:2022年5月

虽然这回答了原问题,但我们可以按照被点赞评论的要求等待循环完成的方法。所以在此添加。实现的关键是:asyncio.gather()run_until_complete()。考虑以下函数:

import asyncio
import time

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument, other_argument): # Added another argument
    time.sleep(5)
    print(f"function finished for {argument=} and {other_argument=}")

def code_to_run_before():
    print('This runs Before Loop!')

def code_to_run_after():
    print('This runs After Loop!')

2. 并行运行但等待完成

输入图像描述

code_to_run_before()                                                         # Anything you want to run before, run here!

loop = asyncio.get_event_loop()                                              # Have a new event loop

looper = asyncio.gather(*[your_function(i, 1) for i in range(1, 5)])         # Run the loop
                               
results = loop.run_until_complete(looper)                                    # Wait until finish

code_to_run_after()                                                          # Anything you want to run after, run here!

这将产生以下输出:

This runs Before Loop!
function finished for argument=2 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=1 and other_argument=1
function finished for argument=4 and other_argument=1
This runs After Loop!

3. 并行运行多个循环并等待完成

在此输入图片描述

code_to_run_before()                                                         # Anything you want to run before, run here!   

loop = asyncio.get_event_loop()                                              # Have a new event loop

group1 = asyncio.gather(*[your_function(i, 1) for i in range(1, 2)])         # Run all the loops you want
group2 = asyncio.gather(*[your_function(i, 2) for i in range(3, 5)])         # Run all the loops you want
group3 = asyncio.gather(*[your_function(i, 3) for i in range(6, 9)])         # Run all the loops you want

all_groups = asyncio.gather(group1, group2, group3)                          # Gather them all                                    
results = loop.run_until_complete(all_groups)                                # Wait until finish

code_to_run_after()                                                          # Anything you want to run after, run here!

这将产生以下输出:

This runs Before Loop!
function finished for argument=3 and other_argument=2
function finished for argument=1 and other_argument=1
function finished for argument=6 and other_argument=3
function finished for argument=4 and other_argument=2
function finished for argument=7 and other_argument=3
function finished for argument=8 and other_argument=3
This runs After Loop!

4. 循环逐个运行,但每个循环的迭代并行运行

此处为图片描述

code_to_run_before()                                                               # Anything you want to run before, run here!

for loop_number in range(3):

    loop = asyncio.get_event_loop()                                                # Have a new event loop

    looper = asyncio.gather(*[your_function(i, loop_number) for i in range(1, 5)]) # Run the loop
                             
    results = loop.run_until_complete(looper)                                      # Wait until finish

    print(f"finished for {loop_number=}")       

code_to_run_after()                                                                # Anything you want to run after, run here!

这会产生以下输出:

This runs Before Loop!
function finished for argument=3 and other_argument=0
function finished for argument=4 and other_argument=0
function finished for argument=1 and other_argument=0
function finished for argument=2 and other_argument=0
finished for loop_number=0
function finished for argument=4 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=2 and other_argument=1
function finished for argument=1 and other_argument=1
finished for loop_number=1
function finished for argument=1 and other_argument=2
function finished for argument=4 and other_argument=2
function finished for argument=3 and other_argument=2
function finished for argument=2 and other_argument=2
finished for loop_number=2
This runs After Loop!

更新:2022年6月

当前版本可能无法在某些jupyter笔记本的版本上运行。原因是jupyter笔记本使用事件循环。为了使它在这样的jupyter版本上工作,nest_asyncio(从名称就可以看出嵌套事件循环)是一种方法。只需在单元格顶部导入并应用它即可:

import nest_asyncio
nest_asyncio.apply()

以上讨论的所有功能也应该在笔记本环境中可用。


3
谢谢!我同意这是最简单的方法。 - mikey
6
好的例子,有没有办法在最终打印“loop finished”之前等待? - Koder101
2
你找到任何方法在最后打印“循环完成”了吗? - Shubham Sharma
2
@Koder101 等待最终结果时,我认为你可以将以下代码替换为:futures = [your_function(i) for i in range(10)] result = await asyncio.gather(*futures)而不是:for i in range(10): your_function(i) - Graham Hesketh
2
我能在1-2分钟内让它运行起来。现在我的代码运行速度快多了。谢谢! - JohnAllen
显示剩余6条评论

106
为了并行化一个简单的for循环,Joblib 在使用 multiprocessing 时提供了很多价值。不仅具有简短的语法,还具有透明地捆绑迭代的能力(当它们非常快时以消除开销),或者捕获子进程的 traceback,以获得更好的错误报告。
免责声明:我是 Joblib 的原作者。

3
我在jupyter中尝试使用joblib,但是它无法正常工作。在Parallel-delayed调用之后,页面停止工作了。 - Jie
2
嗨,我在使用joblib时遇到了问题(https://stackoverflow.com/questions/52166572/python-parallel-no-space-cant-pickle),你有什么线索是什么原因吗?非常感谢。 - Ting Sun
1
看起来我想试一试!能否将其与双重循环一起使用,例如 for i in range(10): for j in range(20)? - CutePoison
joblib在Windows上当迭代次数超过某个任意的数字(大约50万次)时会随机崩溃,非常不稳定。 - undefined

78

如何最简单地并行化这段代码?

使用concurrent.futures中的PoolExecutor。将原始代码与此进行对比,左右排列。首先,最简洁的方法是使用executor.map

...
with ProcessPoolExecutor() as executor:
    for out1, out2, out3 in executor.map(calc_stuff, parameters):
        ...

或通过逐个提交每个调用来解决:

...
with ThreadPoolExecutor() as executor:
    futures = []
    for parameter in parameters:
        futures.append(executor.submit(calc_stuff, parameter))

    for future in futures:
        out1, out2, out3 = future.result() # this will block
        ...

离开上下文会提示执行程序释放资源。

您可以使用线程或进程,并使用完全相同的接口。

一个可用的例子

以下是一个工作示例代码,它将演示以下内容的价值:

将此放入文件 - futuretest.py:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection

def processor_intensive(arg):
    def fib(n): # recursive, processor intensive calculation (avoid n > 36)
        return fib(n-1) + fib(n-2) if n > 1 else n
    start = time()
    result = fib(arg)
    return time() - start, result

def io_bound(arg):
    start = time()
    con = HTTPSConnection(arg)
    con.request('GET', '/')
    result = con.getresponse().getcode()
    return time() - start, result

def manager(PoolExecutor, calc_stuff):
    if calc_stuff is io_bound:
        inputs = ('python.org', 'stackoverflow.com', 'stackexchange.com',
                  'noaa.gov', 'parler.com', 'aaronhall.dev')
    else:
        inputs = range(25, 32)
    timings, results = list(), list()
    start = time()
    with PoolExecutor() as executor:
        for timing, result in executor.map(calc_stuff, inputs):
            # put results into correct output list:
            timings.append(timing), results.append(result)
    finish = time()
    print(f'{calc_stuff.__name__}, {PoolExecutor.__name__}')
    print(f'wall time to execute: {finish-start}')
    print(f'total of timings for each call: {sum(timings)}')
    print(f'time saved by parallelizing: {sum(timings) - (finish-start)}')
    print(dict(zip(inputs, results)), end = '\n\n')

def main():
    for computation in (processor_intensive, io_bound):
        for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
            manager(pool_executor, calc_stuff=computation)

if __name__ == '__main__':
    main()

以下是运行 python -m futuretest 的输出结果:

processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

处理器密集型分析

在 Python 中进行处理器密集型计算时,使用 ProcessPoolExecutor 比使用 ThreadPoolExecutor 更高效。

由于全局解释锁(即 GIL),线程无法使用多个处理器,因此每个计算的时间和墙钟时间(经过的实际时间)会更长。

IO 绑定分析

另一方面,在执行 IO 绑定操作时,使用 ThreadPoolExecutor 比使用 ProcessPoolExecutor 更高效。

Python 的线程是真正的操作系统线程。它们可以被操作系统休眠,并在信息到达时重新唤醒。

最后的想法

我怀疑在 Windows 上使用多进程会更慢,因为 Windows 不支持forking,所以每个新进程都需要花费时间来启动。

您可以在多个进程内嵌套多个线程,但建议不要使用多个线程来启动多个进程。

如果在 Python 中遇到重型处理问题,则可以轻松地通过增加进程数量来扩展,但使用线程则不可行。


ThreadPoolExecutor是否绕过了GIL所施加的限制?此外,您不需要join()来等待执行器完成吗?或者这在上下文管理器中隐含地处理了? - PirateApp
1
不行,不行,是的,会“隐式处理”。 - Russia Must Remove Putin
因为某种原因,在扩大问题规模时,多线程非常快,但是多进程会在macOS中产生一堆卡住的进程。有什么想法吗?该进程只包含嵌套循环和数学计算,没有任何奇特的东西。 - komodovaran_
1
@komodovaran_ 一个进程是一个完整的 Python 进程,每个进程都是如此,而线程只是一个执行线程,具有自己的堆栈,与所有其他线程共享进程、其字节码和内存中的所有其他东西-这有帮助吗? - Russia Must Remove Putin
1
谢谢您提供完全可用的示例。 - thistleknot

28

使用Ray有许多优点:

  • 您可以在多台机器上并行运行,而不仅仅是多核心(使用相同的代码)。
  • 通过共享内存(和零拷贝序列化)高效处理数字数据。
  • 使用分布式调度实现高任务吞吐量。
  • 容错性。

在您的情况下,您可以启动Ray并定义一个远程函数。

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

然后并行调用它

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

要在集群上运行相同的示例,唯一需要更改的是对ray.init()的调用。相关文档可以在这里找到。

请注意,我正在帮助开发Ray。


9
对于考虑使用 Ray 的人来说,了解它不支持 Windows 是很重要的。虽然可以通过在 Windows 上使用 WSL(Windows 子系统)进行一些 hack 来使其工作,但如果想在 Windows 上使用,它并不是开箱即用的。 - OscarVanL
3
很遗憾,它还不支持Python 3.9。 - adonig

10
Dask futures;我很惊讶没有人提到它。
from dask.distributed import Client

client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)

def my_function(i):
    output = <code to execute in the for loop here>
    return output

futures = []

for i in <whatever you want to loop across here>:
    future = client.submit(my_function, i)
    futures.append(future)

results = client.gather(futures)
client.close()

10

我发现joblib对我非常有用。请看下面的例子:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs=-1:使用所有可用的内核


27
在发布自己的答案之前,最好先检查已有的答案。这个答案也建议使用joblib - sanyassh

7
感谢 @iuryxavier
from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

7
这只是一个仅包含代码的答案。我建议您添加解释,告诉读者您发布的代码的作用,以及可能可以找到其他信息的地方。 - starbeamrainbowlabs
池大小的默认值已经是 CPU 数量了:https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool进程是要使用的工作进程数。如果进程为 None,则使用 os.cpu_count() 返回的数字。 - Florian Castellane

6

concurrent包装器是由tqdm库提供的一种很好的并行化长时间运行代码的方式。tqdm通过智能进度条提供当前进度和剩余时间的反馈,对于长时间计算非常有用。

可以通过简单调用thread_map将循环重写为并发线程,或者通过简单调用process_map将其重写为并发多进程:

from tqdm.contrib.concurrent import thread_map, process_map


def calc_stuff(num, multiplier):
    import time

    time.sleep(1)

    return num, num * multiplier


if __name__ == "__main__":

    # let's parallelize this for loop:
    # results = [calc_stuff(i, 2) for i in range(64)]

    loop_idx = range(64)
    multiplier = [2] * len(loop_idx)

    # either with threading:
    results_threading = thread_map(calc_stuff, loop_idx, multiplier)

    # or with multi-processing:
    results_processes = process_map(calc_stuff, loop_idx, multiplier)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接