我不会将concurrent.futures
称为更“高级”的接口 - 它是一个更简单的接口,而且不论您使用多个线程还是多个进程作为底层并行化机制,它的工作方式基本相同。
因此,和几乎所有“简单接口”一样,涉及到相同的权衡:它具有较浅的学习曲线,这在很大程度上仅仅是因为有更少的可供学习的内容; 但是,由于提供的选项更少,它可能会以丰富的界面无法做到的方式让你最终感到沮丧。
就CPU密集型任务而言,这种说法过于缺乏具体信息,没有什么有意义的可言。 对于CPython下的CPU密集型任务,您需要多个进程而不是多个线程才能有任何获得加速的机会。 但是,您获得多少(如果有)加速取决于您的硬件、操作系统以及特定任务所要求的进程间通信量。 在底层,所有进程间并行化机制都依赖于相同的操作系统原语 - 您用于获取这些原语的高级API不是底线速度的主要因素。
编辑:示例
这是您引用文章中显示的最终代码,但我添加了一个必需的导入声明才能使其工作:
from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
# Let the executor divide the work among processes by using 'map'.
with ProcessPoolExecutor(max_workers=nprocs) as executor:
return {num:factors for num, factors in
zip(nums,
executor.map(factorize_naive, nums))}
这里是使用multiprocessing
来实现完全相同的代码:
import multiprocessing as mp
def mp_factorizer_map(nums, nprocs):
with mp.Pool(nprocs) as pool:
return {num:factors for num, factors in
zip(nums,
pool.map(factorize_naive, nums))}
请注意,Python 3.3 中新增了将 multiprocessing.Pool
对象用作上下文管理器的功能。
至于哪个更容易使用,它们基本上是相同的。
一个区别在于,Pool
支持许多不同的操作方式,直到你已经攀登了学习曲线的相当一段路程后,你可能才意识到它可以有多么容易。
再次强调,所有这些不同的方式既是一种优点也是一种缺点。它们是一种优点,因为某些情况下可能需要灵活性。它们是一种缺点,因为“最好只有一种明显的方法”。一个完全使用(如果可能的话)concurrent.futures
的项目在长期维护方面可能会更容易,因为其最小 API 的使用没有过多的新奇之处。
提交多个任务
每个包都有类似于内置的map和itertools.starmap函数的类似物。如果你有一个接受单个参数的工作函数,那么可以使用map方法在任何一个包中提交多个任务:
def worker_function(x):
# Return the square of the passed argument:
return x ** 2
# multiprocessing.pool example:
from multiprocessing import Pool
with Pool() as pool:
squares = pool.map(worker_function, (1, 2, 3, 4, 5, 6))
# concurrent.futures example:
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
squares = list(executor.map(worker_function, (1, 2, 3, 4, 5, 6)))
multiprocessing.pool.Pool.map
方法返回一个列表,而concurrent.futures.ProcessPoolExecutor.map
方法返回一个迭代器,就像内置的map
方法一样。map
方法都接受一个chunksize参数,将提交的任务分批处理成“块”,从任务输入队列中取出这些块,以便池进程在获取下一个块之前处理完当前块中的所有任务。这样可以减少对输入任务队列的写入和读取次数,但每次操作的数据量会更大。对于传递给map
方法的大型可迭代对象,分块处理任务可以极大地提高性能。concurrent.futures.ProcessPoolExecutor
的默认chunksize值,则默认值为1,即不进行分块。对于multiprocessing.pool.Pool
,默认值为None
,这将导致该类根据池大小和传递的可迭代对象中的元素数量计算出一个“合适”的chunksize。在撰写本文时,chunksize的计算方式大致为int(math.ceil(iterable_size / (4 * pool_size)))
。当使用这些包进行多线程处理时(稍后会简要讨论),两个包的默认chunksize值都为1。concurrent.futures
包更加方便,因为它的map
方法可以传递多个可迭代对象。def worker_function(x, y):
return x * y
x_values = (1, 2, 3)
y_values = (9, -2, -8)
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(worker_function, x_values, y_values))
multiprocessing
包中的starmap
方法,并且如果每个参数有单独的可迭代对象,则这些参数必须被"zipped"在一起。def worker_function(x, y):
return x * y
x_values = (1, 2, 3)
y_values = (9, -2, -8)
with multiprocessing.Pool() as pool:
results = pool.starmap(worker_function, zip(x_values, y_values))
zip
函数。def worker_function(x, y):
return x * y
args = (
(1, 9), # first x, y pair of arguments
(2, -2),
(3, -8)
)
with multiprocessing.Pool() as pool:
results = pool.starmap(worker_function, args)
尽快获取任务结果
当提交一批任务时,有时候希望在任务结果(即返回值)可用时立即获取。这两种方法都提供了通过回调机制通知已提交任务的结果可用的功能:
使用multiprocessing.Pool
:
import multiprocessing as mp
def worker_process(i):
return i * i # square the argument
def process_result(return_value):
print(return_value)
def main():
pool = mp.Pool()
for i in range(10):
pool.apply_async(worker_process, args=(i,), callback=process_result)
pool.close()
pool.join()
if __name__ == '__main__':
main()
concurrent.futures
也可以完成相同的操作,尽管有些笨拙。import concurrent.futures
def worker_process(i):
return i * i # square the argument
def process_result(future):
print(future.result())
def main():
executor = concurrent.futures.ProcessPoolExecutor()
futures = [executor.submit(worker_process, i) for i in range(10)]
for future in futures:
future.add_done_callback(process_result)
executor.shutdown()
if __name__ == '__main__':
main()
Future
实例。然后必须将回调添加到Future
中。最后,当调用回调时,传递的参数是已完成任务的Future
实例,并且必须调用result
方法来获取实际的返回值。但是使用concurrent.futures
模块,实际上根本不需要使用回调。可以使用as_completed
方法:import concurrent.futures
def worker_process(i):
return i * i # square the argument
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(worker_process, i) for i in range(10)]
for future in concurrent.futures.as_completed(futures):
print(future.result())
if __name__ == '__main__':
main()
Future
实例,将返回值与原始传递给worker_process
的参数关联起来。import concurrent.futures
def worker_process(i):
return i * i # square the argument
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = {executor.submit(worker_process, i): i for i in range(10)}
for future in concurrent.futures.as_completed(futures):
i = futures[future] # retrieve the value that was squared
print(i, future.result())
if __name__ == '__main__':
main()
import multiprocessing as mp
def worker_process(i):
return i * i # square the argument
def compute_chunksize(pool_size, iterable_size):
if iterable_size == 0:
return 0
chunksize, extra = divmod(iterable_size, pool_size * 4)
if extra:
chunksize += 1
return chunksize
def main():
cpu_count = mp.cpu_count()
N = 100
chunksize = compute_chunksize(cpu_count, N)
with mp.Pool() as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunksize):
print(result)
if __name__ == '__main__':
main()
ProcessPoolExecutor
的map
方法与multiprocessing
包中的Pool.imap
方法有一些相似之处,但也存在一个重要的区别。这些相似之处包括:首先,该方法不会将传入的生成器表达式转换为列表,以计算有效的chunksize值。因此,chunksize参数默认为1。如果传入的iterables较大,则应考虑指定适当的chunksize值。其次,ProcessPoolExecutor.map
方法返回一个结果iterable,需要进行迭代以检索工作函数的所有返回值。这些结果在生成后立即可用,但与Pool.imap
方法有一个区别:直到所有传递给它的输入iterable元素都被迭代并放入任务队列之前,ProcessPoolExecutor.map
方法才会返回其结果iterable。因此,在此发生之前,您无法开始从工作函数中检索结果,即使在迭代和排队所有输入任务之时可能已经生成了许多结果。另外,如果您的情况是输入的生成速度比工作函数生成结果的速度快,那么输入任务队列的存储需求可能会变得非常大。
提交任务并阻塞直到完成
multiprocessing.Pool
类有一个名为apply
的方法,它将任务提交给池,并在结果准备好之前阻塞。返回值只是传递给apply
函数的工作函数的返回值。例如:
import multiprocessing as mp
def worker_process(i):
return i * i # square the argument
def main():
with mp.Pool() as pool:
print(pool.apply(worker_process, args=(6,)))
print(pool.apply(worker_process, args=(4,)))
if __name__ == '__main__':
main()
并发编程库中的concurrent.futures.ProcessPoolExecutor
类没有对应的方法。您需要使用submit
提交任务,然后对返回的Future
实例调用result
方法来获取结果。这样做并不困难,但在适合阻塞式任务提交的情况下,Pool.apply
方法更为方便。这种情况通常发生在需要进行线程处理的场景中,因为大部分工作都是I/O密集型的,只有一个或几个函数是CPU密集型的。主程序先创建一个multiprocessing.Pool
实例,并将其作为参数传递给所有线程。当线程需要调用CPU密集型函数时,现在可以使用Pool.apply
方法来运行该函数,从而在另一个进程中运行代码,并释放当前进程以允许其他线程运行。
多进程还是多线程?
很多人对于concurrent.futures模块中的两个类ProcessPoolExecutor和ThreadPoolExecutor拥有相同接口这一点非常关注。这是一个很好的特性。但是multiprocessing模块也有一个未记录的ThreadPool类,其接口与Pool类完全相同。>>> from multiprocessing.pool import Pool
>>> from multiprocessing.pool import ThreadPool
>>> dir(Pool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>> dir(ThreadPool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>>
# This Pool is a function with the same interface as the
# multiprocessing.pool.ThreadPool.__init__ initializer and returns a
# mulitprocessing.pool.ThreadPool instance:
from multiprocessing.dummy import Pool
逐个提交任务和超时
您可以使用ProcessPoolExecutor.submit
或Pool.apply_async
来逐个提交单个任务,它们分别返回一个Future
实例或AsyncResult
实例,并且可以指定超时值以获取结果:
from concurrent.futures import ProcessPoolExecutor, TimeoutError
from time import sleep
def worker_1():
while True:
print('hanging')
sleep(1)
def main():
with ProcessPoolExecutor(1) as pool:
future = pool.submit(worker_1)
try:
future.result(3) # kill task after 3 seconds?
except TimeoutError:
print('timeout')
if __name__ == '__main__':
main()
print("return from main()")
输出:
hanging
hanging
hanging
timeout
hanging
hanging
hanging
hanging
hanging
hanging
hanging
etc.
future.result(3)
时,主要的过程将在3秒后抛出TimeoutError
异常,因为提交的任务在该时间段内未完成。但是任务仍在继续运行,占用了进程,并且with ProcessPoolExecutor(1) as pool:
块永远不会退出,因此程序无法终止。from multiprocessing import Pool, TimeoutError
from time import sleep
def worker_1():
while True:
print('hanging')
sleep(1)
def main():
with Pool(1) as pool:
result = pool.apply_async(worker_1, args=())
try:
result.get(3) # kill task after 3 seconds?
except TimeoutError:
print('timeout')
if __name__ == '__main__':
main()
print("return from main()")
输出:
hanging
hanging
hanging
timeout
return from main()
with
块不会被阻止退出,因此程序正常终止。原因是当块退出时,Pool
实例的上下文管理器将执行terminate
调用,从而立即终止池中的所有进程。这与ProcessPoolExecutor
实例的上下文处理程序形成对比,后者在其所管理的块退出时执行shutdown(wait=True)
以等待池中所有进程的终止。如果您使用上下文处理程序来处理池的终止和可能的超时情况,那么multiprocessing.Pool
似乎更具优势。更新:在Python 3.9中,shutdown
方法新增了一个名为cancel_futures的参数。因此,如果您显式调用shutdown(cancel_futures=True)
而不是依赖于使用上下文处理程序时隐式调用shutdown
的默认行为,您可以终止任何等待运行的任务(但不能终止已经执行的任务)。multiprocessing.Pool
的上下文处理程序只调用terminate
而不是close
后跟join
,因此您必须确保在退出with
块之前,您提交的所有作业都已完成,例如通过使用阻塞的同步调用(如map
)提交作业或在对apply_async
的调用返回的AsyncResult
对象上调用get
,或者迭代对imap
的调用的结果,或者在池实例上调用close
后跟join
。ProcessPoolExecutor
时,没有办法在超时任务完成之前退出,但是您可以取消尚未运行的已提交任务的启动。在下面的演示中,我们有一个大小为1的池子,这样工作只能连续运行。我们依次提交3个作业,其中前两个作业由于调用time.sleep(3)
而需要3秒钟才能运行。我们立即尝试取消前两个作业。第一次取消尝试失败,因为第一个作业已经在运行。但是由于池子只有一个进程,第二个作业必须等待3秒钟,直到第一个作业完成后才能开始运行,因此取消成功。最后,作业3将在作业1完成后几乎立即开始和结束,这将大约在我们开始提交作业后的3秒钟左右。from concurrent.futures import ProcessPoolExecutor
import time
def worker1(i):
time.sleep(3)
print('Done', i)
def worker2():
print('Hello')
def main():
with ProcessPoolExecutor(max_workers=1) as executor:
t = time.time()
future1 = executor.submit(worker1, 1)
future2 = executor.submit(worker1, 2)
future3 = executor.submit(worker2)
# this will fail since this task is already running:
print(future1.cancel())
# this will succeed since this task hasn't started (it's waiting for future1 to complete):
print(future2.cancel())
future3.result() # wait for completion
print(time.time() - t)
if __name__ == '__main__':
main()
输出:
False
True
Done 1
Hello
3.1249606609344482
我喜欢 concurrent.futures
,主要是因为多个函数参数的迭代器:multiprocessing
在获取函数的多个参数时有些不可靠(没有istarmap()
相当于starmap()
):
import multiprocessing as mp
def power_plus_one(x, y):
return (x**y) + 1
def wrapper(t):
return power_plus_one(*t)
with mp.Pool() as pool:
r = list(pool.imap(wrapper, [(0, 1), (2, 2)]))
print(r)
我发现imap()
/imap_unordered()
在像tqdm
这样的进度条或大型计算的时间估算中非常有用。在concurrent.futures
中,这非常方便:
def power_plus_one(x, y):
return (x**y) + 1
o = dict() # dict for output
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = {executor.submit(power_plus_one, x, y): (x, y) for x, y in [(0, 1), (2, 2)]}
for future in concurrent.futures.as_completed(futures):
i = futures[future]
o[i] = future.result()
print(o)
for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
...
根据我的经验,在Windows操作系统上,相对于concurrent.futures,我使用多进程模块时遇到了很多问题。
其中两个主要差异是:
例如: (获取结果)
with concurrent.futures.ProcessPoolExecutor() as executor:
f1 = executor.submit(some_function, parameter_to_be_passed)
print(f1.result())
如果你从some_function()
返回任何值,你可以直接使用f1.result()
进行捕获/存储。在“multiprocessing”模块中,需要额外的步骤才能完成相同的操作。
如果您正在Linux系统上运行,则可能不会发生挂起情况,但“multiprocessing”模块中的执行复杂度仍然更高。
此外,需要指出的是,我的任务非常消耗CPU资源。
个人建议使用concurrent.futures。
multiprocessing.pool
不需要任何“额外”的步骤:async_result = pool.submit(some_function, args=(parameter1, parameter2, ...)); print(async_result.get())
- Boobooconcurrent.futures
给你更多的控制,例如:
# Created by BaiJiFeiLong@gmail.com at 2021/10/19 10:37
import concurrent.futures
import multiprocessing.pool
import random
import threading
import time
def hello(name):
time.sleep(random.random())
return f"Hello {name} {threading.current_thread()} "
print("ThreadPool:")
pool = multiprocessing.pool.ThreadPool(4)
for args, result in pool.imap_unordered(lambda x: (x, hello(x)), range(10)):
print(args, "=>", result)
print("\nThreadPoolExecutor:")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
futures = {executor.submit(hello, x): x for x in range(10)}
for future in concurrent.futures.as_completed(futures):
print(futures[future], "=>", future.result()
示例输出:
ThreadPool:
1 => Hello 1 <DummyProcess(Thread-2, started daemon 29700)>
0 => Hello 0 <DummyProcess(Thread-1, started daemon 29688)>
2 => Hello 2 <DummyProcess(Thread-3, started daemon 19680)>
6 => Hello 6 <DummyProcess(Thread-3, started daemon 19680)>
3 => Hello 3 <DummyProcess(Thread-4, started daemon 33028)>
4 => Hello 4 <DummyProcess(Thread-2, started daemon 29700)>
5 => Hello 5 <DummyProcess(Thread-1, started daemon 29688)>
9 => Hello 9 <DummyProcess(Thread-2, started daemon 29700)>
8 => Hello 8 <DummyProcess(Thread-4, started daemon 33028)>
7 => Hello 7 <DummyProcess(Thread-3, started daemon 19680)>
ThreadPoolExecutor:
0 => Hello 0 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
1 => Hello 1 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
2 => Hello 2 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
4 => Hello 4 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
3 => Hello 3 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
8 => Hello 8 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
5 => Hello 5 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
6 => Hello 6 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
7 => Hello 7 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
9 => Hello 9 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
ProcessPoolExecutor
比Pool
具有更多的选项,因为ProcessPoolExecutor.submit
返回Future
实例,允许取消(cancel
)、查看抛出了哪个异常(exception
)以及在完成时动态添加回调(add_done_callback
)。而Pool.apply_async
返回的AsyncResult
实例没有这些功能。另一方面,由于Pool.__init__
中的initializer
/initargs
、maxtasksperchild
和上下文(context),Pool
具有更多的选项,并且公开了更多的方法。 - max