在你的情况下,multiprocessing
之所以如此“意外地缓慢”,是因为multiprocessing
的map
和map_async
函数实际上通过连接父进程和子进程的管道来往返pickle Python对象。这可能需要相当长的时间。在这段时间里,子进程几乎没有任何事情可做,这就是在htop
中看到的东西。在不同的系统之间,管道传输性能可能会有很大的差异,这也是为什么对于一些人来说,您的池代码比单个CPU代码更快,尽管对于您来说并非如此(其他因素可能会发挥作用,这只是一个例子,旨在解释该效应)。
您可以采取什么措施使其更快?
在符合POSIX标准的系统上不要对输入进行pickling。
如果您在Unix上,可以通过利用POSIX的进程fork行为(写时复制内存)避免父->子通信开销:
在父进程中创建要处理的作业输入(例如一组大矩阵),放入全局变量中。然后通过调用multiprocessing.Process()
自己创建多个工作进程。在子进程中,从全局变量中获取作业输入。简单地说,这使得子进程可以访问父进程的内存而没有任何通信开销(*,下面解释)。通过例如multiprocessing.Queue
将结果发送回父进程。这将节省大量通信开销,特别是如果输出与输入相比较小的情况下。这种方法在Windows上不起作用,因为在那里,multiprocessing.Process()
会创建一个完全新的Python进程,它不继承父进程的状态。
利用numpy多线程。
根据您的实际计算任务,涉及multiprocessing
可能根本不会有所帮助。如果您自己编译numpy并启用OpenMP指令,则对大矩阵的操作可能会非常高效地多线程化(并分布在许多CPU核心上;GIL在此处不是限制因素)。基本上,这是您可以在numpy / scipy上获得的使用多个CPU核心最有效的用法。
*一般情况下,子进程无法直接访问父进程的内存。但是,在fork()
之后,父进程和子进程处于等价状态。把整个父进程的内存复制到RAM中的另一个位置是很愚蠢的。这就是为什么写时复制原则会起作用。只要子进程不更改其内存状态,它实际上就可以访问父进程的内存。仅在修改时,相应的位和片段才会复制到子进程的内存空间中。
重大修改:
让我添加一段代码,它使用多个工作进程压缩大量的输入数据,并遵循“1.不要在符合POSIX标准的系统上使用pickle”这一建议。此外,传回给工作管理器(父进程)的信息量非常低。此示例的重型计算部分是单个值分解。它可以充分利用OpenMP。我已经多次执行了该示例:
- 一次使用1、2或4个工作进程和
OMP_NUM_THREADS=1
,因此每个工作进程创建最大负载100%。在那里,所提到的工作者数量计算时间比例行为几乎是线性的,净加速比因涉及的工作者数量而异。
- 一次使用1、2或4个工作进程和
OMP_NUM_THREADS=4
,以便每个进程通过生成4个OpenMP线程来创建最大负载400%。我的计算机有16个真实核心,因此4个进程每个最大负载400%将几乎发挥出机器的最大性能。扩展不再完全线性,速度提高的比例不是涉及的工作者数量,但与OMP_NUM_THREADS=1
相比,绝对计算时间仍然显着减少,而且随着工作进程数量的增加,时间仍然显着减少。
- 一次使用更大的输入数据、4个核心和
OMP_NUM_THREADS=4
。它导致平均的系统负载为1253%。
- 一次使用与上一个相同的设置,但是
OMP_NUM_THREADS=5
。它导致平均的系统负载为1598%,这表明我们从那台16核机器中得到了一切。但是,实际的计算墙壁时间与后者相比并没有改善。
代码:
import os
import time
import math
import numpy as np
from numpy.linalg import svd as svd
import multiprocessing
MATRIX_SIZE = 1000
MATRIX_COUNT = 16
def rnd_matrix():
offset = np.random.randint(1,10)
stretch = 2*np.random.rand()+0.1
return offset + stretch * np.random.rand(MATRIX_SIZE, MATRIX_SIZE)
print "Creating input matrices in parent process."
INPUT = [rnd_matrix() for _ in xrange(MATRIX_COUNT)]
def worker_function(result_queue, worker_index, chunk_boundary):
"""Work on a certain chunk of the globally defined `INPUT` list.
"""
result_chunk = []
for m in INPUT[chunk_boundary[0]:chunk_boundary[1]]:
u, s, v = svd(m)
output = int(np.sum(s))
result_chunk.append(output)
result_queue.put((worker_index, result_chunk))
def work(n_workers=1):
def calc_chunksize(l, n):
"""Rudimentary function to calculate the size of chunks for equal
distribution of a list `l` among `n` workers.
"""
return int(math.ceil(len(l)/float(n)))
chunk_size = calc_chunksize(INPUT, n_workers)
chunk_boundaries = [
(i, i+chunk_size) for i in xrange(0, len(INPUT), chunk_size)]
if n_workers != len(chunk_boundaries):
return None
result_queue = multiprocessing.Queue()
children = []
for worker_index in xrange(n_workers):
children.append(
multiprocessing.Process(
target=worker_function,
args=(
result_queue,
worker_index,
chunk_boundaries[worker_index],
)
)
)
for c in children:
c.start()
results = [None] * len(INPUT)
for _ in xrange(n_workers):
worker_index, result_chunk = result_queue.get(block=True)
chunk_boundary = chunk_boundaries[worker_index]
results[chunk_boundary[0]:chunk_boundary[1]] = result_chunk
for c in children:
c.join()
return results
def main():
durations = []
n_children = [1, 2, 4]
for n in n_children:
print "Crunching input with %s child(ren)." % n
t0 = time.time()
result = work(n)
if result is None:
continue
duration = time.time() - t0
print "Result computed by %s child process(es): %s" % (n, result)
print "Duration: %.2f s" % duration
durations.append(duration)
normalized_durations = [durations[0]/d for d in durations]
for n, normdur in zip(n_children, normalized_durations):
print "%s-children speedup: %.2f" % (n, normdur)
if __name__ == '__main__':
main()
输出结果:
$ export OMP_NUM_THREADS=1
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 16.66 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 8.27 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 4.37 s
1-children speedup: 1.00
2-children speedup: 2.02
4-children speedup: 3.81
48.75user 1.75system 0:30.00elapsed 168%CPU (0avgtext+0avgdata 1007936maxresident)k
0inputs+8outputs (1major+809308minor)pagefaults 0swaps
$ export OMP_NUM_THREADS=4
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 8.62 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 4.92 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 2.95 s
1-children speedup: 1.00
2-children speedup: 1.75
4-children speedup: 2.92
106.72user 3.07system 0:17.19elapsed 638%CPU (0avgtext+0avgdata 1022240maxresident)k
0inputs+8outputs (1major+841915minor)pagefaults 0swaps
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [21762, 26806, 10148, 22947, 20900, 8161, 20168, 17439, 23497, 26360, 6789, 11216, 12769, 23022, 26221, 20480, 19140, 13757, 23692, 19541, 24644, 21251, 21000, 21687, 32187, 5639, 23314, 14678, 18289, 12493, 29766, 14987, 12580, 17988, 20853, 4572, 16538, 13284, 18612, 28617, 19017, 23145, 11183, 21018, 10922, 11709, 27895, 8981]
Duration: 12.69 s
4-children speedup: 1.00
174.03user 4.40system 0:14.23elapsed 1253%CPU (0avgtext+0avgdata 2887456maxresident)k
0inputs+8outputs (1major+1211632minor)pagefaults 0swaps
$ export OMP_NUM_THREADS=5
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [19528, 17575, 21792, 24303, 6352, 22422, 25338, 18183, 15895, 19644, 20161, 22556, 24657, 30571, 13940, 18891, 10866, 21363, 20585, 15289, 6732, 10851, 11492, 29146, 12611, 15022, 18967, 25171, 10759, 27283, 30413, 14519, 25456, 18934, 28445, 12768, 28152, 24055, 9285, 26834, 27731, 33398, 10172, 22364, 12117, 14967, 18498, 8111]
Duration: 13.08 s
4-children speedup: 1.00
230.16user 5.98system 0:14.77elapsed 1598%CPU (0avgtext+0avgdata 2898640maxresident)k
0inputs+8outputs (1major+1219611minor)pagefaults 0swaps
timeit
模块,或者至少将pool = Pool()
函数移出计时例程。 - David Zwicker