调用numpy共轭函数时进行多进程调用时出现奇怪行为

5

这个脚本评估了numpy.conjugate例程在不同大小的矩阵上使用不同数量并行进程的运行时间,并记录相应的运行时间。矩阵形状仅在其第一维(从1,64,64到256,64,64)上有所变化。共轭调用始终在1,64,64子矩阵上进行,以确保正在处理的部分适合我的系统中的L2缓存(每个核心256 KB,我的情况下是25MB的L3缓存)。运行该脚本会产生以下图表(带有略有不同的轴标签和颜色)。

enter image description here

正如您所看到的,从大约100,64,64的形状开始,运行时取决于使用的并行进程数。这是什么原因?或者为什么对于小于(100,64,64)的矩阵,对进程数量的依赖性很低?我的主要目标是找到一种修改此脚本的方法,使得对于任意大小的矩阵'a',运行时尽可能独立于进程数。在20个进程的情况下:所有'a'矩阵最多需要: 20 * 16 * 256 * 64 * 64字节=320MB;所有'b'子矩阵最多需要: 20 * 16 * 1 * 64 * 64字节=1.25MB。因此,所有子矩阵同时适合L3缓存,并且分别适合我的CPU每个核心的L2缓存。我只对这些测试使用了物理核心,没有使用超线程。以下是脚本:
from multiprocessing import Process, Queue
import time
import numpy as np
import os
from matplotlib import pyplot as plt
os.environ['OPENBLAS_NUM_THREADS'] = '1'
os.environ['MKL_NUM_THREADS'] = '1'

def f(q,size):
    a = np.random.rand(size,64,64) + 1.j*np.random.rand(size,64,64)
    start = time.time()
    n=a.shape[0]
    for i in range(20):
        for b in a:
            b.conj()
    duration = time.time()-start
    q.put(duration)

def speed_test(number_of_processes=1,size=1):
    number_of_processes = number_of_processes
    process_list=[]
    queue = Queue()
    #Start processes
    for p_id in range(number_of_processes):
        p = Process(target=f,args=(queue,size))
        process_list.append(p)
        p.start()
    #Wait until all processes are finished
    for p in process_list:
        p.join()

    output = []
    while queue.qsize() != 0:
        output.append(queue.get())
    return np.mean(output)

if __name__ == '__main__':
    processes=np.arange(1,20,3)
    data=[[] for i in processes]
    for p_id,p in enumerate(processes):
        for size_0 in range(1,257):
            data[p_id].append(speed_test(number_of_processes=p,size=size_0))

    fig,ax = plt.subplots()
    for d in data:
        ax.plot(d)
    ax.set_xlabel('Matrix Size: 1-256,64,64')
    ax.set_ylabel('Runtime in seconds')

    fig.savefig('result.png')

我知道这是一次性代码,但在加入进程之前,您应该从队列中获取所有数据。q.put 可能会被阻塞,等待另一侧的 q.get,而另一侧正在等待 p.join,而 p.join 又被 q.put 阻塞。此外,q.qsizeq.empty 大多只存在于与非多进程 Queue 库的接口兼容性。在某些情况下(管理线程的竞争条件),它不可靠。mp.manager 队列没有这个问题,因为它们是非 mp 队列的代理(尽管这意味着它们也更慢)。 - Aaron
感谢您的输入。标准方法是从每个进程发送一个额外的“DONE”消息,以便知道何时完成所有操作? - Tim
要么知道您有多少个工作人员,并期望该数量的“完成”消息,要么知道您有多少个输出,并期望那么多的值。通过为“get”、“put”和“join”提供超时,您还可以在技术上使方法变得安全。最好的做法是“永远不要等待永远”。 - Aaron
2个回答

2
这个问题至少涉及两种复杂效应的组合:cache-thrashing和frequency-scaling。我能在我的6核i5-9600KF处理器上重现这个问题。

Cache thrashing

最大的影响来自于cache-thrashing问题。可以通过查看RAM吞吐量轻松跟踪它。事实上,对于1个进程,吞吐量为4 GiB/s,对于6个进程,吞吐量为20 GiB/s。读取吞吐量与写入吞吐量相似,因此吞吐量是对称的。我的RAM能够达到高达约40 GiB/s,但通常只有混合读/写模式下的32 GiB/s左右。这意味着RAM压力非常大。这种使用情况通常有两种情况:
  • 从RAM中读取/写回数组,因为cache不够大;
  • 访问内存中不同位置的访问次数很多,但它们映射在L3中的同一缓存行中。
乍一看,第一种情况更有可能发生,因为数组是连续的而且相当大(不幸的是另一个效应也会发生,请参见下文)。实际上,主要问题是a数组太大了,无法放入L3中。实际上,当大小大于128时,a会占用超过128 * 64 * 64 * 8 * 2 = 8 MiB / process的空间。实际上,a由两个必须读取的数组构建,因此所需的缓存空间比这还要大三倍:即> 24 MiB / process。问题是所有进程都分配了相同数量的内存,因此进程数越多,a占用的累积空间就越大。当累积空间大于cache时,处理器需要将数据写入RAM并读回,这很慢。
事实上,情况更糟:进程没有完全同步,因此一些进程可能由于填充a而刷新其他进程所需的数据。
此外,b.conj()创建一个新数组,可能不是每次分配在同一内存分配中,因此处理器还需要将数据写回。此效应取决于所使用的低级分配器。可以使用out参数来解决此问题。话虽如此,在我的机器上问题不太明显(使用out参数时,6个进程速度快2%,1个进程速度相同)。
简而言之,更多的进程访问更多的数据,全局数据量不适合CPU缓存,导致性能下降,因为数组需要反复重新加载。

Frequency scaling

现代处理器采用频率缩放(如Turbo-Boost)来加速(相当)顺序应用程序,但在处理计算时它们不能为所有核心使用相同的频率,因为处理器有一个有限的功率预算。这导致了较低的理论可扩展性。事实上,所有进程都在做相同的工作,因此N个进程在N个核心上运行所需的时间不是1个进程在1个核心上运行的N倍。
当创建1个进程时,两个核心的操作频率为4550-4600 MHz(其他核心的频率为3700 MHz),而当运行6个进程时,所有核心的操作频率为4300 MHz。这足以解释我机器上高达7%的差异。
你几乎无法控制Turbo频率,但可以完全禁用它或控制频率,使最小-最大频率都设置为基础频率。请注意,处理器在病理情况下(即出现关键温度时进行限制)可以使用更低的频率。通过调整频率,我发现性能有所改善(实际上提高了7〜10%)。

其他影响

当进程数等于核心数时,操作系统对进程进行的上下文切换比留出一个核心用于其他任务时更多。上下文切换会稍微降低进程的性能。当所有核心都被分配时,操作系统调度程序很难避免不必要的迁移,因此这种情况特别明显。这通常发生在运行许多进程但计算机性能不高的PC上。在我的机器上,这种开销大约为5-10%。
请注意,进程数不应超过核心数(也不应超过超线程)。超出此限制后,性能几乎无法预测,并且会出现许多复杂的开销(主要是调度问题)。

我也期望频率缩放能做到这一点(尽管我没有在我的机器上检查过是否属实)。对于缓存,不需要将a加载到缓存中,因为代码中会读取它。事实上,Numpy会加载b,但是ba的一个移动部分,所以最终会完全加载a。处理器使用集合关联缓存,并不知道已经加载的a的前面部分可以被替换。由于已加载值的虚拟地址是递增的,处理器在L1缓存中连续加载新数据时,可能会覆盖重要数据。 - Jérôme Richard
1
我认为你在分析中忽略了一个重要的问题,即每个进程都需要完全重新加载20次a。对于1个进程,数据肯定可以完全适应缓存,因此不需要重新加载,下一次迭代可能会显着加快。实际上,数据理论上可以从多个运行中重复使用(当进程数相同时且大小不变时,在我的机器上就是这种情况)。 - Jérôme Richard
测量内存带宽并不简单,并且文档记录不完整,所以这不是一个新手问题 ;)。 这需要使用依赖于架构的硬件计数器(基本上从SandyBridge处理器到Skylake甚至到Skylake-SP等名称更改,更不用说AMD Zen)。 并非所有处理器都有此类计数器。 此外,您需要完全的根访问权限才能获取此功能(出于安全原因,由于旁道攻击和代码分析)。 从技术上讲,在获得特殊访问/权限的情况下,您可以在非root的情况下访问计数器(但这很复杂)。 - Jérôme Richard
基本上,你需要使用Linux的perf工具:首先列出硬件计数器,然后搜索与内存相关的计数器,例如uncore IMC计数器。如果没有这样的计数器,您可以分析L3缓存未命中的计数器(但这不像分析IMC计数器那样精确)。一旦找到,您可以使用perf stat -e THE_COUNTER_1,THE_COUNTER_2,...,THE_COUNTER_N YOUR_APPLICATION。请注意,您不能精确地读取太多的计数器:有一个限制(例如4个计数器),操作系统将需要进行某种(不精确的)复用。然后,您可以通过时间除以大小来计算。 - Jérôme Richard
分析的部分需要足够长的时间(如果可能,稳定超过100毫秒),以便结果有用。因此,您可能需要调整基准测试(我已经这样做了,以在我的机器上进行分析)。请注意,您还可以在运行程序时定期分析数据,以及许多其他高级功能。有关更多信息,请考虑阅读此处的教程:https://perf.wiki.kernel.org/index.php/Tutorial。 - Jérôme Richard
显示剩余3条评论

1
我会翻译这段英文:

我将接受Jérôme的答案。
对于可能会问到的感兴趣的读者:
为什么你要将大的numpy数组细分,并且只处理子矩阵?
答案是,这样更快!

让我们考虑一个复杂的矩阵'a',它有128MB大(太大了无法放入缓存中)。

对于单个进程,可以快速检查:

import numpy as np
import timeit
a=np.random.rand(8192,32,32)+1.j*np.random.rand(8192,32,32)
print(timeit.timeit('a.conj()',number=100,globals=globals()))
print(timeit.timeit('for i in range(0,8192,8): a[i:i+8].conj()',number = 100 ,globals = globals()))

第二次迭代128KB子矩阵比第一次更快完成(如果128KB位于您的L1和L2缓存大小之间)。
在以下图表中,我将显示在两台测试机上计算时间与子矩阵大小的关系。每个测试用例有两个图表,分别涵盖16KB-1024KB(使用16KB步长)和0.5MB-64MB(使用0.5MB步长)的子矩阵大小范围。
Machine I:2颗Intel Xenon E5-2640 v3处理器(L1i=L1d=32KB,L2=256KB,L3=20MB,10个核心) enter image description here enter image description here Machine II:2颗Intel Xenon E5-2640 v4处理器(L1i=L1d=32KB,L2=256KB,L3=50MB,20个核心) enter image description here enter image description here 计算速度最快的子矩阵大小(64KB)恰好是测试机器上每个CPU的组合L1缓存大小,这很可疑。在组合L2缓存值(512KB)处没有特殊情况发生。一旦所有并行运行进程的组合子矩阵大小超过可用CPU之一的L3缓存,计算时间就会迅速增加。(例如,机器1,19个进程,在约1MB处,机器2,37个进程,在约1.3MB处)。以下是绘图脚本:
from multiprocessing import Process, Queue
import time
import numpy as np
import timeit
from matplotlib import pyplot as plt
import os
os.environ['OPENBLAS_NUM_THREADS'] = '1'
os.environ['MKL_NUM_THREADS'] = '1'
m_shape =(8192,32,32)
def f(q,size):
    a = np.random.rand(*m_shape) + 1.j*np.random.rand(*m_shape)
    start = time.time()
    n=a.shape[0]    
    for i in range(0,n,size):
        a[i:i+size].conj()
    duration = time.time()-start
    q.put(duration)

def speed_test(number_of_processes=1,size=1):
    number_of_processes = number_of_processes
    process_list=[]
    queue = Queue()
    #Start processes
    for p_id in range(number_of_processes):
        p = Process(target=f,args=(queue,size))
        process_list.append(p)
        p.start()
    #Wait until all processes are finished
    for p in process_list:
        p.join()

    output = []
    while queue.qsize() != 0:
        output.append(queue.get())
    return np.mean(output)

if __name__ == '__main__':
    processes=np.arange(1,20,3)
    data=[[] for i in processes]

    ## L1 L2 cache data range
    sub_matrix_sizes = list(range(1,64,1))

    ## L3 cache data range
    #sub_matrix_sizes = list(range(32,4098,32))
    #sub_matrix_sizes.append(8192)
    
    for p_id,p in enumerate(processes):
        for size_0 in sub_matrix_sizes:
            data[p_id].append(speed_test(number_of_processes=p,size=size_0))
        print('{} of {} finished.'.format(p_id+1,len(processes)))

    from matplotlib import pyplot as plt
    from xframe.presenters.matplolibPresenter import plot1D
    data = np.array(data)
    sub_size_in_kb = np.array(sub_matrix_sizes)*np.dtype(complex).itemsize*np.prod(m_shape[1:])/1024
    sub_size_in_mb = sub_size_in_kb/1024

    fig,ax = plt.subplots()
    for d in data:
        ax.plot(sub_size_in_kb,d)
    
    ax.set_xlabel('Matrix Size in KB')
    #ax.set_xlabel('Matrix Size in MB')
    ax.set_ylabel('Runtime in seconds')
    fig.savefig('result.png')
    print('done.')

这很有趣,但我觉得讨论有点混乱,因为你一开始说“它更快”,但图表显示相反的结果。当人们知道更多进程计算更多数据时(一种弱扩展),这是有意义的。尽管如此,我认为对于这样的图表,强扩展性更直观一些。 - Jérôme Richard
1
但是,即使我们考虑单个进程的情况(深蓝色线条),图表显示,在Machine 1的情况下,通过迭代64 KB子矩阵来共轭矩阵'a'需要约0.016秒,而仅将矩阵分成两半(第二个图中x轴为MB的最右侧数据点)需要约0.042秒。因此,迭代许多小的64 KB矩阵似乎快了约2.6倍。也许这些图表有点令人困惑... - Tim
每个测试用例的两个图应该并排查看,因为第二个图在x轴上使用不同的采样继续了第一个图。 - Tim

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接