如何加速Python嵌套循环处理百万元素

3

我想匹配两个对象(一个数据集包含大约50万个元素,另一个包含大约200万个元素),这些对象满足某些条件,然后将这两个对象的信息保存到文件中。许多变量不涉及配对计算,但对于我的后续分析很重要,因此我需要跟踪这些变量并将它们保存下来。如果有方法可以矢量化整个分析过程,那么速度会更快。以下以随机数为例:

import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from PyAstronomy import pyasl


RA1 = np.random.uniform(0,360,500000)
DEC1 = np.random.uniform(-90,90,500000)
d = np.random.uniform(55,2000,500000)
z = np.random.uniform(0.05,0.2,500000)
e = np.random.uniform(0.05,1.0,500000)
s = np.random.uniform(0.05,5.0,500000)
RA2 = np.random.uniform(0,360,2000000)
DEC2 = np.random.uniform(-90,90,2000000)
n = np.random.randint(10,10000,2000000)
m = np.random.randint(10,10000,2000000)

f = open('results.txt','a')
for i in range(len(RA1)):
    if i % 50000 == 0:
        print i
    ra1 = RA1[i]
    dec1 = DEC1[i]
    c1 = SkyCoord(ra=ra1*u.degree, dec=dec1*u.degree)
    for j in range(len(RA2)):
        ra2 = RA2[j]
        dec2 = DEC2[j]
        c2 = SkyCoord(ra=ra2*u.degree, dec=dec2*u.degree)

        ang = c1.separation(c2)
        sep = d[i] * ang.radian
        pa = pyasl.positionAngle(ra1, dec1, ra2, dec2)

        if sep < 1.5:
            np.savetxt(f,np.c_[ra1,dec1,sep,z[i],e[i],s[i],n[j],m[j]], fmt = '%1.4f   %1.4f   %1.4f   %1.4f   %1.4f   %1.4f   %i   %i')

1
首先,不要这样使用numpy,因为由于Python-C转换,numpy数组的索引相对较慢。Numpy应该以函数/向量化的方式使用。您应该通过尽可能多地委托给numpy的函数和方法来最小化纯Python运行时。您不应该只是将numpy.array用作list的替代品。 - Eli Korvigo
@EliKorvigo 谢谢。我正在尝试将整个计算向量化。但是我不知道该如何做得更好。 - Huanian Zhang
在您的程序中,您具有I/O访问权限。您可以通过在内存中创建一个中间缓冲区并定期刷新它(就像使用“print”语句一样)来将其最小化。 - Laurent LAPORTE
你正在使用Python 2.7:请将range替换为xrange - Laurent LAPORTE
5个回答

12

你需要问自己的基本问题是:能否减少数据集?

如果不能,我有一个坏消息:500000 * 2000000等于1e12。这意味着你要执行一万亿次操作。

角分离涉及到一些三角函数(我认为这里涉及到cossinsqrt),因此每个操作大约需要数百纳秒至微秒级。假设每个操作需要1微秒,你仍需要12天才能完成这个任务。而且这还假设你没有任何Python循环或IO开销,我认为1微秒对于这种操作是合理的。

但肯定有优化的方法:SkyCoord可以进行矢量化,但只支持1D:

# Create the SkyCoord for the longer array once
c2 = SkyCoord(ra=RA2*u.degree, dec=DEC2*u.degree)
# and calculate the seperation from each coordinate of the shorter list
for idx, (ra, dec) in enumerate(zip(RA1, DEC1)):
    c1 = SkyCoord(ra=ra*u.degree, dec=dec*u.degree)
    # x will be the angular seperation with a length of your RA2 and DEC2 arrays
    x = c1.separation(c2)

这将已经产生数个数量级的加速:

# note that I made these MUCH shorter
RA1 = np.random.uniform(0,360,5)
DEC1 = np.random.uniform(-90,90,5)
RA2 = np.random.uniform(0,360,10)
DEC2 = np.random.uniform(-90,90,10)

def test(RA1, DEC1, RA2, DEC2):
    """Version with vectorized inner loop."""
    c2 = SkyCoord(ra=RA2*u.degree, dec=DEC2*u.degree)
    for idx, (ra, dec) in enumerate(zip(RA1, DEC1)):
        c1 = SkyCoord(ra=ra*u.degree, dec=dec*u.degree)
        x = c1.separation(c2)

def test2(RA1, DEC1, RA2, DEC2):
    """Double loop."""
    for ra, dec in zip(RA1, DEC1):
        c1 = SkyCoord(ra=ra*u.degree, dec=dec*u.degree)
        for ra, dec in zip(RA2, DEC2):
            c2 = SkyCoord(ra=ra*u.degree, dec=dec*u.degree)
            x = c1.separation(c2)

%timeit test(RA1, DEC1, RA2, DEC2)  # 1 loop, best of 3: 225 ms per loop
%timeit test2(RA1, DEC1, RA2, DEC2) # 1 loop, best of 3: 2.71 s per loop

这已经快了10倍,而且它的可扩展性要好得多:

RA1 = np.random.uniform(0,360,5)
DEC1 = np.random.uniform(-90,90,5)
RA2 = np.random.uniform(0,360,2000000)
DEC2 = np.random.uniform(-90,90,2000000)

%timeit test(RA1, DEC1, RA2, DEC2)  # 1 loop, best of 3: 2.8 s per loop

# test2 scales so bad I only use 50 elements here
RA2 = np.random.uniform(0,360,50)
DEC2 = np.random.uniform(-90,90,50)
%timeit test2(RA1, DEC1, RA2, DEC2)  # 1 loop, best of 3: 11.4 s per loop
请注意,通过向量化内部循环,我能够在1/4的时间内计算出40000倍以上的元素。因此,通过向量化内部循环,你应该能够快大约200k倍。
在这里,我们在3秒钟内计算了5次200万个分离操作,所以每个操作大约需要300纳秒。以这个速度,完成这个任务需要3天时间。
即使你也可以消除剩余循环中的向量化,我认为这不会产生任何巨大的加速,因为在那个层面上,循环开销远小于每个循环中的计算时间。使用 line-profiler 也支持这种说法。
Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    11                                           def test(RA1, DEC1, RA2, DEC2):
    12         1       216723 216723.0      2.6      c2 = SkyCoord(ra=RA2*u.degree, dec=DEC2*u.degree)
    13         6          222     37.0      0.0      for idx, (ra, dec) in enumerate(zip(RA1, DEC1)):
    14         5       206796  41359.2      2.5          c1 = SkyCoord(ra=ra*u.degree, dec=dec*u.degree)
    15         5      7847321 1569464.2     94.9          x = c1.separation(c2)

如果从这个Hits中看不出来,它是从5个2,000,000次运行中得出的结果。为了比较,这里是从一个5x20次运行中得到的test2结果:

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    17                                           def test2(RA1, DEC1, RA2, DEC2):
    18         6           80     13.3      0.0      for ra, dec in zip(RA1, DEC1):
    19         5       195030  39006.0      0.6          c1 = SkyCoord(ra=ra*u.degree, dec=dec*u.degree)
    20       105         1737     16.5      0.0          for ra, dec in zip(RA2, DEC2):
    21       100      3871427  38714.3     11.8              c2 = SkyCoord(ra=ra*u.degree, dec=dec*u.degree)
    22       100     28870724 288707.2     87.6              x = c1.separation(c2)
test2test1效率更低的原因是,c2 = SkyCoord部分占用了总时间的12%,而不仅仅是2.5%,每个单独调用separation都有一定的开销。因此它变慢的原因并不是Python循环的开销,而是SkyCoord构造函数和separation的静态部分。

显然需要将pa计算和保存到文件的过程向量化(我没有使用过PyAstronomynumpy.savetext,所以无法给出建议)。

但是,仍然存在一个问题,即在普通计算机上进行一万亿次三角函数运算是不可行的。

一些减少时间的额外想法:

  • 使用多处理器,使您计算机的每个核心并行工作,理论上可以通过您的核心数量加速运行。实际情况下,这可能达不到目标,并且我建议仅在拥有>= 8个内核或可用集群时才执行此操作。否则,花费在正确地获取多处理器运行的时间可能超过单核心运行时间。特别是因为多处理器可能无法正常工作,然后您必须重新运行计算。

  • 预处理元素:删除RA或DEC差异使匹配变得不可能的项目。但是,如果这不能删除一部分元素,则额外的减法和比较实际上可能会降低速度。


非常感谢。这是一个很好的想法,可以继续前进。我认为加快速度的一种方法是在计算分离之前对 delta_ra = ra1 - ra2 < 2 度 进行切割。有一件事我不确定的是当我们向量化 c2 时,如何跟踪索引,因为我需要与 nm 相对应的信息。 - Huanian Zhang
@HuanianZhang 如果有帮助,请随意点赞。关于你的问题:您可以使用(如果预处理,则可选地切片)np.arange(RA2.size)数组。这将保存表示最终(x)数组中的值的索引。 - MSeifert

2

以下是使用内存缓冲区来减少I/O的实现。注意:我更喜欢使用io模块进行文件输入/输出,以便与Python 3更兼容。我认为这是最佳实践。使用它不会降低性能。

import io

with io.open('results.txt', 'a') as f:
    buf = io.BytesIO()
    for i in xrange(len(RA1)):
        if i % 50000 == 0:
            print(i)
            f.write(buf.getvalue())
            buf.truncate(0)
        ra1 = RA1[i]
        dec1 = DEC1[i]
        c1 = SkyCoord(ra=ra1 * u.degree, dec=dec1 * u.degree)
        for j in xrange(len(RA2)):
            ra2 = RA2[j]
            dec2 = DEC2[j]
            c2 = SkyCoord(ra=ra2 * u.degree, dec=dec2 * u.degree)

            ang = c1.separation(c2)
            sep = d[i] * ang.radian
            pa = pyasl.positionAngle(ra1, dec1, ra2, dec2)

            if sep < 1.5:
                np.savetxt(buf, np.c_[ra1, dec1, sep, z[i], e[i], s[i], n[j], m[j]],
                           fmt='%1.4f   %1.4f   %1.4f   %1.4f   %1.4f   %1.4f   %i   %i')
    f.write(buf.getvalue())

注意:在Python 2中,我使用xrange而不是range来减少内存使用。 buf.truncate(0)可以被以下新实例替换:buf = io.BytesIO()。这样可能更有效率...

谢谢。我试过了,但速度并没有显著提高。它太慢了。如果使用这种暴力嵌套循环来运行如此庞大的数据,可能需要一年的时间。 - Huanian Zhang

2

加速的第一种方法是:对于ra2和dec2中的每一对,计算SkyCoord c2。你可以通过创建一个SkyCoord的缓冲数组来提高速度:

f = open('results.txt','a')
C1 = [SkyCoord(ra=ra1*u.degree, dec=DEC1[i]*u.degree) 
      for i, ra1 in enumerate(RA1)] )
C2 = [SkyCoord(ra=ra2*u.degree, dec=DEC2[i]*u.degree) 
      for i, ra2 in enumerate(RA2)] )  # buffer coords

for i, c1 in enumerate(C1):  # we only need enumerate() to get i
    for j, c2 in enumerate(C2):
        ang = c1.separation(c2)  # note we don't have to calculate c2
        if d[i] < 1.5 / ang.radian:
            # now we don't have to multiply every iteration. 
            # The right part is a constant

            # the next line is only executed if objects are close enough
            pa = pyasl.positionAngle(RA1[i], DEC1[i], RA2[j], DEC2[j])
            np.savetxt('...whatever')

你可以通过阅读SkyCoord.separation代码并对其进行向量化来加速处理,以替代SkyCoord,但我自己太懒了。如果我们有两个2xN坐标矩阵x1、x2,它看起来会类似于(Matlab/Octave):
cos = pdist2(x1, x2) / (sqrt(dot(x1, x1)) * sqrt(dot(x2, x2))) 

谢谢。看起来两个循环并没有显著加速它。 - Huanian Zhang

1
假设您想将数据集减少到小于2度的差异(根据您的评论),您可以通过广播创建一个掩码(可能需要分块进行,但方法相同)。
aMask=(abs(RA1[:,None]-RA2[None,:])<2)&(abs(DEC1[:,None]-DEC2[None,:])<2)

在一些小规模的测试中,这将减少约1/5000的数据集。然后创建一个掩码的位置数组。
locs=np.where(aMask)

(array([   0,    2,    4, ..., 4998, 4999, 4999], dtype=int32),
 array([3575, 1523, 1698, ..., 4869, 1801, 2792], dtype=int32))

(来自我的5k x 5k测试)。将所有其他变量通过,例如,d [locs [0]]转储为1d数组,您可以按照@MSeifert的答案将其推送到SkyCoord中。

当您获得输出并与1.5进行比较时,您将获得一个布尔值bmask,然后您可以outlocs = locs [0] [bmask]并输出RA1 [outlocs]等。

我曾尝试在FEM分析中对壳体进行空间导数,其中采用所有数据点之间的完整排名同样低效。


非常感谢。你能否再扩展一下你的代码,让我更好地理解它? - Huanian Zhang
当我尝试使用0.5百万* 2百万的数组时,aMask =(abs(RA1 [:,None] - RA2 [None,:])<2)&(abs(DEC1 [:,None] - DEC2 [None,:])<2)会返回MemoryError - Huanian Zhang
抱歉,您可能需要设置一个循环来以RA1和DEC1为块进行操作,例如aMask=(abs(RA1[0:100,None]-RA2[None,:])<2)&(abs(DEC1[0:100,None]-DEC2[None,:])<2)。这个块的大小受到RAM的限制。您还可以选择将其余的 *1 数组存储进pickle以节省RAM空间,让您在一次操作中更多地进行屏蔽和SkyCoord处理。 - Daniel F
正如MSiefert所指出的,500kx2M相当于1e12,因此完整秩的布尔矩阵在计算之前将占用125GB的内存。一个1000元素的切片将占用250MB,这应该是可行的,如果您这样重新分配它:aMask=np.where(aMask),它将变得更小,因此您可以将更大的数组推送到SkyCoords。 - Daniel F
此外,查看文档,PyAstronomy.pyasl.getAngDist 可能更有效。 - Daniel F
而且,search_around_sky 似乎是您想要一步完成的所有操作。 - Daniel F

-1

savetxt 这样使用本质上是

astr = fmt % (ra1,dec1,sep,z[i],e[i],s[i],n[j],m[j])
astr += '\n'  # or include in fmt
f.write(astr)

也就是说,只需将格式化的行写入文件即可。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接