多进程池使得Numpy矩阵乘法变慢

22
所以,我正在尝试使用和,但似乎我错过了一些重要的要点。为什么版本要慢得多呢?我查看了,可以看到创建了几个进程,但它们都共用一个CPU,总计达到了〜100%。
$ cat test_multi.py 
import numpy as np
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
    for i in range(100):
        matrix = matrix * matrix
    return matrix

if __name__ == '__main__':
    matrices = []
    for i in range(4):
        matrices.append(np.random.random_integers(100, size=(1000, 1000)))

    pool = Pool(8)
    print timeit(lambda: map(mmul, matrices), number=20)
    print timeit(lambda: pool.map(mmul, matrices), number=20)

$ python test_multi.py 
16.0265390873
19.097837925

[更新]

  • 将基准测试进程更改为timeit
  • 使用我的核心数初始化池
  • 更改计算方式,使计算更多,内存传输更少(我希望如此)

仍然没有变化。 pool版本仍然较慢,并且我可以在htop中看到只使用了一个核心,还有几个进程被生成。

[更新2]

目前,我正在阅读 @Jan-Philip Gehrcke 建议使用multiprocessing.Process()Queue的内容。 但同时我想知道:

  1. 为什么我的示例适用于tiago?可能是不起作用的原因1是什么?
  2. 在我的示例代码中是否有复制过程? 我打算让我的代码为每个线程提供一个矩阵列表中的矩阵。
  3. 我的代码是否是一个坏的示例,因为我使用了Numpy

我发现当其他人知道我的最终目标时往往会得到更好的答案: 我有很多文件,目前是以串行方式加载和处理的。 处理需要大量的CPU,因此我认为通过并行化可以获得很多收益。 我的目标是并行调用分析文件的Python函数。 此外,这个函数只是C代码的接口,我认为这会有所不同。

1 Ubuntu 12.04, Python 2.7.3, i7 860 @ 2.80 - 如需更多信息,请留言。

[更新3]

这里是Stefano示例代码的结果。 不知何故没有加速。 :/

testing with 16 matrices
base  4.27
   1  5.07
   2  4.76
   4  4.71
   8  4.78
  16  4.79
testing with 32 matrices
base  8.82
   1 10.39
   2 10.58
   4 10.73
   8  9.46
  16  9.54
testing with 64 matrices
base 17.38
   1 19.34
   2 19.62
   4 19.59
   8 19.39
  16 19.34

[更新4]回答Jan-Philip Gehrcke的评论

很抱歉我没有表达清楚。正如我在更新2中所写,我的主要目标是并行化对第三方Python库函数的串行调用。这个函数是一些C代码的接口。我被建议使用Pool,但这却不起作用,所以我尝试了一些更简单的东西,就像上面展示的带有numpy的例子。但即使在那里,我也无法实现性能提升,尽管它看起来像是'尴尬的可并行化'。因此,我认为我必须错过了一些重要的东西。这个信息是我通过这个问题和奖励正在寻找的。

[更新5]

感谢所有巨大的输入。但是阅读你们的答案只会给我带来更多的问题。出于这个原因,我将阅读关于基础知识的内容,并在我更清楚自己不知道什么的情况下创建新的SO问题。


1
我猜在这里创建进程的开销会让你崩溃。尝试使用 timeit 模块,或者至少将 pool = Pool() 函数移出计时例程。 - David Zwicker
1
我可能错了,但我怀疑大部分时间都花在你的进程之间发送矩阵上。 - NPE
1
但是你必须在不同的进程之间传递它们(即复制内存)。矩阵乘法非常快(根据您的计时大约需要6毫秒),因此这种开销是相当显著的。 - David Zwicker
关于您的实际用例:计算的输出是什么?最重要的是:它的数据大小与输入相比如何?您是将一个1000x1000矩阵转换为单个数字还是转换为另一个1000x1000矩阵? - Dr. Jan-Philip Gehrcke
Framester,我读了你的更新。我们仍需要澄清:一个输入单元有多大(例如从文件中读取1 MB),一个输入单元上的计算强度如何(例如在您的机器上使用一个CPU内核构建输出需要多长时间),输出单元有多大(也是1 MB吗?只有一个数字吗?)。如果您有例如1000个适度大小的输入单元,并且一个内核将在上面运行5分钟,直到返回单个数字作为输出-那么是的,这将是有效的并行化的主要例子。我想你的问题是不同的。 - Dr. Jan-Philip Gehrcke
显示剩余2条评论
8个回答

23

关于您所有进程都在同一个CPU上运行的事实,请看我的回答

在导入时,numpy会更改父进程的CPU亲和性,这样当您之后使用Pool时,它所生成的所有工作进程都将竞争同一个核心,而不是使用计算机上所有可用的核心。

您可以在导入numpy后调用taskset以重置CPU亲和性,使所有核心都得到利用:

import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
    for i in range(100):
        matrix = matrix * matrix
    return matrix

if __name__ == '__main__':

    matrices = []
    for i in range(4):
        matrices.append(np.random.random_integers(100, size=(1000, 1000)))

    print timeit(lambda: map(mmul, matrices), number=20)

    # after importing numpy, reset the CPU affinity of the parent process so
    # that it will use all cores
    os.system("taskset -p 0xff %d" % os.getpid())

    pool = Pool(8)
    print timeit(lambda: pool.map(mmul, matrices), number=20)

输出:

    $ python tmp.py                                     
    12.4765810966
    pid 29150's current affinity mask: 1
    pid 29150's new affinity mask: ff
    13.4136221409
如果你在运行此脚本时使用top观察CPU使用情况,当执行“parallel”部分时,应该会看到它使用了所有的核心。正如其他人指出的那样,在您原始的示例中,涉及数据取样、进程创建等方面的开销可能超过了并行化带来的任何潜在好处。 编辑:我怀疑单个进程似乎一直更快的部分原因是,numpy 可能有一些技巧可以加速按元素计算矩阵乘法,但当任务分布到多个核心时,它可能无法使用这些技巧。
例如,如果我只使用普通的Python列表来计算斐波那契数列,我可以通过并行化获得巨大的速度提升。同样地,如果我以不利用矢量化的方式进行逐元素相乘,对于并行版本,我也可以获得类似的速度提升:
import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool

def fib(dummy):
    n = [1,1]
    for ii in xrange(100000):
        n.append(n[-1]+n[-2])

def silly_mult(matrix):
    for row in matrix:
        for val in row:
            val * val

if __name__ == '__main__':

    dt = timeit(lambda: map(fib, xrange(10)), number=10)
    print "Fibonacci, non-parallel: %.3f" %dt

    matrices = [np.random.randn(1000,1000) for ii in xrange(10)]
    dt = timeit(lambda: map(silly_mult, matrices), number=10)
    print "Silly matrix multiplication, non-parallel: %.3f" %dt

    # after importing numpy, reset the CPU affinity of the parent process so
    # that it will use all CPUS
    os.system("taskset -p 0xff %d" % os.getpid())

    pool = Pool(8)

    dt = timeit(lambda: pool.map(fib,xrange(10)), number=10)
    print "Fibonacci, parallel: %.3f" %dt

    dt = timeit(lambda: pool.map(silly_mult, matrices), number=10)
    print "Silly matrix multiplication, parallel: %.3f" %dt

输出:

$ python tmp.py
Fibonacci, non-parallel: 32.449
Silly matrix multiplication, non-parallel: 40.084
pid 29528's current affinity mask: 1
pid 29528's new affinity mask: ff
Fibonacci, parallel: 9.462
Silly matrix multiplication, parallel: 12.163

我认为这个答案的第一句话就是整个答案的核心。所有的东西都在同一个核心上运行,因此它会稍微慢一些(因为有额外的开销),而不是更快(因为没有并行化)。 - abarnert
1
实际上,我仍然相信这更可能与numpy的怪癖有关,而不仅仅是与CPU利用率有关。即使我并行化Framester的原始代码,以便它实际上利用了我所有的CPU,我仍然发现它比串行运行略慢。只有当我刻意避免numpy特别擅长的事情时,我才会从并行化中看到任何性能提升。 - ali_m
你是对的;抱歉,我没有读够,我只是在测试自己的琐碎/愚蠢的示例代码。没关系。 :) - abarnert
为了比较,您必须展示当省略 os.system("taskset -p 0xff %d" % os.getpid()) 时会发生什么。 - Dr. Jan-Philip Gehrcke
为什么?如果我省略那一行,那么(至少在我的机器上)只有一个核心会被利用,所以当然我看不到并行版本的加速。 - ali_m
显示剩余2条评论

17

在你的情况下,multiprocessing之所以如此“意外地缓慢”,是因为multiprocessingmapmap_async函数实际上通过连接父进程和子进程的管道来往返pickle Python对象。这可能需要相当长的时间。在这段时间里,子进程几乎没有任何事情可做,这就是在htop中看到的东西。在不同的系统之间,管道传输性能可能会有很大的差异,这也是为什么对于一些人来说,您的池代码比单个CPU代码更快,尽管对于您来说并非如此(其他因素可能会发挥作用,这只是一个例子,旨在解释该效应)。

您可以采取什么措施使其更快?

  1. 在符合POSIX标准的系统上不要对输入进行pickling。

    如果您在Unix上,可以通过利用POSIX的进程fork行为(写时复制内存)避免父->子通信开销:

    在父进程中创建要处理的作业输入(例如一组大矩阵),放入全局变量中。然后通过调用multiprocessing.Process()自己创建多个工作进程。在子进程中,从全局变量中获取作业输入。简单地说,这使得子进程可以访问父进程的内存而没有任何通信开销(*,下面解释)。通过例如multiprocessing.Queue将结果发送回父进程。这将节省大量通信开销,特别是如果输出与输入相比较小的情况下。这种方法在Windows上不起作用,因为在那里,multiprocessing.Process()会创建一个完全新的Python进程,它不继承父进程的状态。

  2. 利用numpy多线程。 根据您的实际计算任务,涉及multiprocessing可能根本不会有所帮助。如果您自己编译numpy并启用OpenMP指令,则对大矩阵的操作可能会非常高效地多线程化(并分布在许多CPU核心上;GIL在此处不是限制因素)。基本上,这是您可以在numpy / scipy上获得的使用多个CPU核心最有效的用法。

*一般情况下,子进程无法直接访问父进程的内存。但是,在fork()之后,父进程和子进程处于等价状态。把整个父进程的内存复制到RAM中的另一个位置是很愚蠢的。这就是为什么写时复制原则会起作用。只要子进程不更改其内存状态,它实际上就可以访问父进程的内存。仅在修改时,相应的位和片段才会复制到子进程的内存空间中。

重大修改:

让我添加一段代码,它使用多个工作进程压缩大量的输入数据,并遵循“1.不要在符合POSIX标准的系统上使用pickle”这一建议。此外,传回给工作管理器(父进程)的信息量非常低。此示例的重型计算部分是单个值分解。它可以充分利用OpenMP。我已经多次执行了该示例:

  • 一次使用1、2或4个工作进程和OMP_NUM_THREADS=1,因此每个工作进程创建最大负载100%。在那里,所提到的工作者数量计算时间比例行为几乎是线性的,净加速比因涉及的工作者数量而异。
  • 一次使用1、2或4个工作进程和OMP_NUM_THREADS=4,以便每个进程通过生成4个OpenMP线程来创建最大负载400%。我的计算机有16个真实核心,因此4个进程每个最大负载400%将几乎发挥出机器的最大性能。扩展不再完全线性,速度提高的比例不是涉及的工作者数量,但与OMP_NUM_THREADS=1相比,绝对计算时间仍然显着减少,而且随着工作进程数量的增加,时间仍然显着减少。
  • 一次使用更大的输入数据、4个核心和OMP_NUM_THREADS=4。它导致平均的系统负载为1253%。
  • 一次使用与上一个相同的设置,但是OMP_NUM_THREADS=5。它导致平均的系统负载为1598%,这表明我们从那台16核机器中得到了一切。但是,实际的计算墙壁时间与后者相比并没有改善。

代码:

import os
import time
import math
import numpy as np
from numpy.linalg import svd as svd
import multiprocessing


# If numpy is compiled for OpenMP, then make sure to control
# the number of OpenMP threads via the OMP_NUM_THREADS environment
# variable before running this benchmark.


MATRIX_SIZE = 1000
MATRIX_COUNT = 16


def rnd_matrix():
    offset = np.random.randint(1,10)
    stretch = 2*np.random.rand()+0.1
    return offset + stretch * np.random.rand(MATRIX_SIZE, MATRIX_SIZE)


print "Creating input matrices in parent process."
# Create input in memory. Children access this input.
INPUT = [rnd_matrix() for _ in xrange(MATRIX_COUNT)]


def worker_function(result_queue, worker_index, chunk_boundary):
    """Work on a certain chunk of the globally defined `INPUT` list.
    """
    result_chunk = []
    for m in INPUT[chunk_boundary[0]:chunk_boundary[1]]:
        # Perform single value decomposition (CPU intense).
        u, s, v = svd(m)
        # Build single numeric value as output.
        output =  int(np.sum(s))
        result_chunk.append(output)
    result_queue.put((worker_index, result_chunk))


def work(n_workers=1):
    def calc_chunksize(l, n):
        """Rudimentary function to calculate the size of chunks for equal 
        distribution of a list `l` among `n` workers.
        """
        return int(math.ceil(len(l)/float(n)))

    # Build boundaries (indices for slicing) for chunks of `INPUT` list.
    chunk_size = calc_chunksize(INPUT, n_workers)
    chunk_boundaries = [
        (i, i+chunk_size) for i in xrange(0, len(INPUT), chunk_size)]

    # When n_workers and input list size are of same order of magnitude,
    # the above method might have created less chunks than workers available. 
    if n_workers != len(chunk_boundaries):
        return None

    result_queue = multiprocessing.Queue()
    # Prepare child processes.
    children = []
    for worker_index in xrange(n_workers):
        children.append(
            multiprocessing.Process(
                target=worker_function,
                args=(
                    result_queue,
                    worker_index,
                    chunk_boundaries[worker_index],
                    )
                )
            )

    # Run child processes.
    for c in children:
        c.start()

    # Create result list of length of `INPUT`. Assign results upon arrival.
    results = [None] * len(INPUT)

    # Wait for all results to arrive.
    for _ in xrange(n_workers):
        worker_index, result_chunk = result_queue.get(block=True)
        chunk_boundary = chunk_boundaries[worker_index]
        # Store the chunk of results just received to the overall result list.
        results[chunk_boundary[0]:chunk_boundary[1]] = result_chunk

    # Join child processes (clean up zombies).
    for c in children:
        c.join()
    return results


def main():
    durations = []
    n_children = [1, 2, 4]
    for n in n_children:
        print "Crunching input with %s child(ren)." % n
        t0 = time.time()
        result = work(n)
        if result is None:
            continue
        duration = time.time() - t0
        print "Result computed by %s child process(es): %s" % (n, result)
        print "Duration: %.2f s" % duration
        durations.append(duration)
    normalized_durations = [durations[0]/d for d in durations]
    for n, normdur in zip(n_children, normalized_durations):
        print "%s-children speedup: %.2f" % (n, normdur)


if __name__ == '__main__':
    main()

输出结果:

$ export OMP_NUM_THREADS=1
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 16.66 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 8.27 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 4.37 s
1-children speedup: 1.00
2-children speedup: 2.02
4-children speedup: 3.81
48.75user 1.75system 0:30.00elapsed 168%CPU (0avgtext+0avgdata 1007936maxresident)k
0inputs+8outputs (1major+809308minor)pagefaults 0swaps

$ export OMP_NUM_THREADS=4
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 8.62 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 4.92 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 2.95 s
1-children speedup: 1.00
2-children speedup: 1.75
4-children speedup: 2.92
106.72user 3.07system 0:17.19elapsed 638%CPU (0avgtext+0avgdata 1022240maxresident)k
0inputs+8outputs (1major+841915minor)pagefaults 0swaps

$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [21762, 26806, 10148, 22947, 20900, 8161, 20168, 17439, 23497, 26360, 6789, 11216, 12769, 23022, 26221, 20480, 19140, 13757, 23692, 19541, 24644, 21251, 21000, 21687, 32187, 5639, 23314, 14678, 18289, 12493, 29766, 14987, 12580, 17988, 20853, 4572, 16538, 13284, 18612, 28617, 19017, 23145, 11183, 21018, 10922, 11709, 27895, 8981]
Duration: 12.69 s
4-children speedup: 1.00
174.03user 4.40system 0:14.23elapsed 1253%CPU (0avgtext+0avgdata 2887456maxresident)k
0inputs+8outputs (1major+1211632minor)pagefaults 0swaps

$ export OMP_NUM_THREADS=5
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [19528, 17575, 21792, 24303, 6352, 22422, 25338, 18183, 15895, 19644, 20161, 22556, 24657, 30571, 13940, 18891, 10866, 21363, 20585, 15289, 6732, 10851, 11492, 29146, 12611, 15022, 18967, 25171, 10759, 27283, 30413, 14519, 25456, 18934, 28445, 12768, 28152, 24055, 9285, 26834, 27731, 33398, 10172, 22364, 12117, 14967, 18498, 8111]
Duration: 13.08 s
4-children speedup: 1.00
230.16user 5.98system 0:14.77elapsed 1598%CPU (0avgtext+0avgdata 2898640maxresident)k
0inputs+8outputs (1major+1219611minor)pagefaults 0swaps

关于 point2 的额外问题:http://stackoverflow.com/questions/15531556/how-to-recompile-numpy-with-enabled-openmp-directives - Framester
+1: 最可信的解释。除了在numpy中允许OpenMP外,如果有的话,还应该使用供应商blas库。 - Stefano M
我想知道为什么在numpy中并行矩阵操作时没有明显的性能提升,这可能是因为numpy使用外部BLAS和LAPACK库,这些库通常编译为同时使用多个核心。如果您尝试并行运行像svd(使用LAPACK)这样的东西,也许每个工作进程仍然会像执行在多个核心上一样运行,并且会执行“次优”操作,例如写入彼此的缓存等。 - ali_m
@ali_m:在第一个例子中,我们看到了理想的扩展(1个子进程加速比:1.00,2个子进程加速比:2.02,4个子进程加速比:3.81)。我猜你说的是:计算4个子进程/OMP_NUM_THREADS=1的持续时间:4.37秒,而使用OMP_NUM_THREADS=4只需要2.95秒。是的,这远远不是4倍的变化(理想情况下应该是这样)。然而,这是可以预料的。因为对于巨大的矩阵进行SVD操作涉及到在RAM、缓存和寄存器之间传输大量数据,相应的管道(特别是CPU和RAM之间的Hypertransport/Quickpath/FSB)是瓶颈。非常简单。 - Dr. Jan-Philip Gehrcke
感谢提供的示例代码。不幸的是,有时代码会在“使用1个子进程处理输入”后停止运行,并永远停留在那里。但我还没有检查过我的numpy版本是否支持OMP。 - Framester
希望能够看到今天的情况更新,包括Python 3和新的libgomp(以及可能的其他OpenMP实现)。 - Dima Pasechnik

4

你的代码是正确的。我在我的系统上运行它(有2个核心,超线程),并获得了以下结果:

$ python test_multi.py 
30.8623809814
19.3914041519

我查看了进程,正如预期的那样,并行部分显示有几个进程工作接近100%。这一定是您的系统或Python安装中的某些问题。


感谢您尝试我的代码并进行评估。您有什么想法是出了什么问题,或者我可以搜索什么? - Framester
不确定哪里出了问题。您使用的是什么系统?我建议尝试除Pool之外的其他multiprocessing方法,甚至可以使用不同的进程在共享数组的各个部分上工作。 - tiago

3

解决方案

在进行任何计算之前,设置以下环境变量(对于一些较早版本的numpy,您可能需要在执行import numpy之前设置它们):

os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"

工作原理

NumPy的实现已经使用了多线程优化库,如OpenMP、MKL或OpenBLAS等。这就是为什么我们自己实现多进程并没有看到很大的改进。更糟糕的是,我们会遇到过多的线程。例如,如果我的机器有8个CPU核心,当我编写单进程代码时,NumPy可能会使用8个线程进行计算。然后我使用多进程启动8个进程,就会得到64个线程。这并不是有益的,线程之间的上下文切换和其他开销可能会花费更多的时间。通过设置上述环境变量,我们将每个进程的线程数限制为1,从而获得最有效的总线程数。

示例代码

from timeit import timeit
from multiprocessing import Pool
import sys
import os

import numpy as np

def matmul(_):
    matrix = np.ones(shape=(1000, 1000))
    _ = np.matmul(matrix, matrix)

def mixed(_):
    matrix = np.ones(shape=(1000, 1000))
    _ = np.matmul(matrix, matrix)

    s = 0
    for i in range(1000000):
        s += i

if __name__ == '__main__':
    if sys.argv[1] == "--set-num-threads":
        os.environ["OMP_NUM_THREADS"] = "1"
        os.environ["MKL_NUM_THREADS"] = "1"
        os.environ["OPENBLAS_NUM_THREADS"] = "1"
        os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
        os.environ["NUMEXPR_NUM_THREADS"] = "1"

    if sys.argv[2] == "matmul":
        f = matmul
    elif sys.argv[2] == "mixed":
        f = mixed

    print("Serial:")
    print(timeit(lambda: list(map(f, [0] * 8)), number=20))

    with Pool(8) as pool:
        print("Multiprocessing:")
        print(timeit(lambda: pool.map(f, [0] * 8), number=20))

我在一个拥有8个vCPU(不一定意味着8个核心)的AWS p3.2xlarge实例上测试了这段代码:

$ python test_multi.py --no-set-num-threads matmul
Serial:
3.3447616740000115
Multiprocessing:
3.5941055110000093

$ python test_multi.py --set-num-threads matmul
Serial:
9.464500446000102
Multiprocessing:
2.570238267999912

在设置环境变量之前,串行版本和多进程版本没有太大的区别,都大约是3秒钟,通常情况下,多进程版本比较慢,就像OP所演示的那样。设置线程数后,我们发现串行版本花费了9.46秒,变得更加缓慢!这证明即使只使用单个进程,numpy也在利用多线程。多进程版本花费了2.57秒,稍有改善,这可能是因为在我的实现中节省了跨线程数据传输时间。
这个例子并没有展示出多进程的强大之处,因为numpy已经在使用并行化。当正常的Python密集CPU计算与numpy操作混合时,多进程最有益。例如:
$ python test_multi.py --no-set-num-threads mixed
Serial:
12.380275611000116
Multiprocessing:
8.190792100999943

$ python test_multi.py --set-num-threads mixed
Serial:
18.512066430999994
Multiprocessing:
4.8058130150000125

在这里,将多进程的线程数设置为1是最快的。

备注:这也适用于一些其他CPU计算库,如PyTorch。


2

默认情况下,Pool 只使用 n 个进程,其中 n 是您计算机上的 CPU 数量。您需要指定要使用多少个进程,例如 Pool(5)

在此处查看更多信息


2

测量算术吞吐量是一项非常困难的任务:基本上您的测试用例过于简单,我看到很多问题。

首先,您正在测试整数算术:是否有特殊原因?使用浮点数可以获得可比较的结果,可以跨越许多不同的架构。

其次,matrix = matrix*matrix会覆盖输入参数(矩阵是通过引用而不是值传递的),并且每个样本都必须处理不同的数据...

最后,应该在更广泛的问题规模和工人数量范围内进行测试,以掌握一般趋势。

因此,这是我修改后的测试脚本。

import numpy as np
from timeit import timeit
from multiprocessing import Pool

def mmul(matrix):
    mymatrix = matrix.copy()
    for i in range(100):
        mymatrix *= mymatrix
    return mymatrix

if __name__ == '__main__':

    for n in (16, 32, 64):
        matrices = []
        for i in range(n):
            matrices.append(np.random.random_sample(size=(1000, 1000)))

        stmt = 'from __main__ import mmul, matrices'
        print 'testing with', n, 'matrices'
        print 'base',
        print '%5.2f' % timeit('r = map(mmul, matrices)', setup=stmt, number=1)

        stmt = 'from __main__ import mmul, matrices, pool'
        for i in (1, 2, 4, 8, 16):
            pool = Pool(i)
            print "%4d" % i, 
            print '%5.2f' % timeit('r = pool.map(mmul, matrices)', setup=stmt, number=1)
            pool.close()
            pool.join()

我的结果如下:

$ python test_multi.py 
testing with 16 matrices
base  5.77
   1  6.72
   2  3.64
   4  3.41
   8  2.58
  16  2.47
testing with 32 matrices
base 11.69
   1 11.87
   2  9.15
   4  5.48
   8  4.68
  16  3.81
testing with 64 matrices
base 22.36
   1 25.65
   2 15.60
   4 12.20
   8  9.28
  16  9.04

[更新]我在另一台电脑上运行了这个示例,并得到了相同的减速:
testing with 16 matrices
base  2.42
   1  2.99
   2  2.64
   4  2.80
   8  2.90
  16  2.93
testing with 32 matrices
base  4.77
   1  6.01
   2  5.38
   4  5.76
   8  6.02
  16  6.03
testing with 64 matrices
base  9.92
   1 12.41
   2 10.64
   4 11.03
   8 11.55
  16 11.59

我必须承认我不知道是谁的错(nump、python、编译器、内核)...

谢谢,但我收到以下错误信息:'Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored' - Framester
@Framester 请在 pool.close() 后面添加 pool.join();如果运行时间短,您可以增加 timeit 中的迭代次数。 - Stefano M
1
除了代码,没有人可以责怪! :) 我在一台现代的16核E5-2650系统上尝试了它。我观察到mp池大小为2和4时加速。超过这个大小,执行时间又变得更糟了。这段代码的并行化方法远非高效。Stefano:你在一台计算机上观察到的加速与涉及的核心数根本不是线性的。一个合理的理论解释你两台计算机之间的差异:在第一个示例中,单核速度和管道传输性能之间的比率小于第二个示例中的比率。 - Dr. Jan-Philip Gehrcke

1

由于您提到有很多文件,我建议采用以下解决方案:

  • 制作一个文件名列表。
  • 编写一个函数来加载和处理以输入参数命名的单个文件。
  • 使用Pool.map()将该函数应用于文件列表。

由于每个实例现在都会加载自己的文件,因此传递的唯一数据是文件名,而不是(潜在的大型)numpy数组。


1
我还注意到,当我在Pool.map()函数中运行numpy矩阵乘法时,在某些机器上它的速度要慢得多。我的目标是使用Pool.map()并在我的机器的每个核心上运行一个进程来并行化我的工作。当事情运行得很快时,numpy矩阵乘法只是并行执行的整体工作的一小部分。当我查看进程的CPU使用率时,我可以看到每个进程可以在运行缓慢的机器上使用400+%的CPU,但始终<=100%在运行速度快的机器上。对我来说,解决方案是停止numpy进行多线程操作。事实证明,numpy被设置为在我的Pool.map()运行缓慢的机器上进行多线程操作。显然,如果您已经使用Pool.map()进行并行化,则让numpy也并行化只会创建干扰。我只需在运行Python代码之前调用export MKL_NUM_THREADS=1,就可以在任何地方快速运行。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接