Python - 如何加速计算城市之间的距离

5

我数据库中有55249个城市,每个城市都有经纬度值。我想要计算每个城市到其他所有城市的距离,并存储那些距离不超过30公里的城市。以下是我的算法:

# distance function
from math import sin, cos, sqrt, atan2, radians

def distance(obj1, obj2):
    lat1 = radians(obj1.latitude)
    lon1 = radians(obj1.longitude)
    lat2 = radians(obj2.latitude)
    lon2 = radians(obj2.longitude)
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = (sin(dlat/2))**2 + cos(lat1) * cos(lat2) * (sin(dlon/2))**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    return round(6373.0 * c, 2)

def distances():
    cities = City.objects.all()  # I am using Django ORM
    for city in cities:
        closest = list()
        for tested_city in cities:
            distance = distance(city, tested_city)
            if distance <= 30. and distance != 0.:
                closest.append(tested_city)
        city.closest_cities.add(*closest)  # again, Django thing
        city.save()  # Django

这个可以运行但是需要花费很长时间。可能需要几周才能完成。有没有什么方法可以加快速度?


1
乍一看,如果您有n个城市,则可以通过某些因素来减少它,您可以测试所有n * n可能的匹配,但这并不是必要的,因为从城市A到B等于从城市B到A。 - Pham Trung
1
距离矩阵计算总是需要很长时间,因为它具有二次复杂度。在这里,我只看到两种优化方式:只计算矩阵的一半,因为它是对称的;或者使用Python特定的距离计算函数。 - Andrey Shokhin
我不需要进行地理编码。我已经拥有所有的纬度和经度。 - gwaramadze
2
你可以通过检查“dlat”是否相应于小于30公里弧以及将类似的启发式方法应用于“dlon”,并使用大于30的(虚拟)值尽早退出距离计算。这样,你可以节省大量昂贵的三角函数调用。 - M Oehm
1
制作30km x 30km的方块,首先确定每个城市位于哪个方块中。然后每个城市与其所在方块内的所有城市距离都小于30km,并且只需要与周围方块中的城市进行比较。将所有内容放入PostGIS中进行处理会比在Python中处理更快。 - RemcoGerlich
显示剩余6条评论
5个回答

7
你无法计算每对城市之间的距离。相反,你需要将城市放入一个空间划分数据结构中,以便可以进行快速的最近邻查询。空间划分数据结构 SciPy带有适用于此应用程序的kd-tree实现,scipy.spatial.KDTree
这里有两个困难。首先,scipy.spatial.KDTree使用点之间的欧几里得距离,但你想要使用沿着地球表面的大圆距离。其次,经度会环绕,因此最近邻可能具有相差360°的经度。如果采取以下方法,则可以解决这两个问题:
  1. 将您的位置从大地坐标纬度经度)转换为ECEF(地心坐标系)坐标(xyz)。

  2. 将这些ECEF坐标放入{{link3:scipy.spatial.KDTree}}中。

  3. 将您的大圆距离(例如30千米)转换为欧几里得距离。

  4. 调用{{link4:scipy.spatial.KDTree.query_ball_point}}以获取范围内的城市。

这里有一些示例代码来说明这种方法。函数geodetic2ecef来自David Parunakian的PySatel,并在GPL下获得许可。
from math import radians, cos, sin, sqrt

# Constants defined by the World Geodetic System 1984 (WGS84)
A = 6378.137
B = 6356.7523142
ESQ = 6.69437999014 * 0.001

def geodetic2ecef(lat, lon, alt=0):
    """Convert geodetic coordinates to ECEF."""
    lat, lon = radians(lat), radians(lon)
    xi = sqrt(1 - ESQ * sin(lat))
    x = (A / xi + alt) * cos(lat) * cos(lon)
    y = (A / xi + alt) * cos(lat) * sin(lon)
    z = (A / xi * (1 - ESQ) + alt) * sin(lat)
    return x, y, z

def euclidean_distance(distance):
    """Return the approximate Euclidean distance corresponding to the
    given great circle distance (in km).

    """
    return 2 * A * sin(distance / (2 * B))

让我们随机生成五万个城市位置并将它们转换为ECEF坐标:

>>> from random import uniform
>>> cities = [(uniform(-90, 90), uniform(0, 360)) for _ in range(50000)]
>>> ecef_cities = [geodetic2ecef(lat, lon) for lat, lon in cities]

将它们放入 scipy.spatial.KDTree 中:
>>> import numpy
>>> from scipy.spatial import KDTree
>>> tree = KDTree(numpy.array(ecef_cities))

查找距离伦敦约100公里的所有城市:

>>> london = geodetic2ecef(51, 0)
>>> tree.query_ball_point([london], r=euclidean_distance(100))
array([[37810, 15755, 16276]], dtype=object)

这个数组包含了你查询的每个点周围距离r内的城市数组。每个邻居都以其在传递给KDTree的原始数组中的索引表示。因此,在伦敦周围约100公里范围内有三个城市,即原始列表中索引为37810、15755和16276的城市:

>>> from pprint import pprint
>>> pprint([cities[i] for i in [37810, 15755, 16276]])
[(51.7186871990946, 359.8043453670437),
 (50.82734317063884, 1.1422052710187103),
 (50.95466110717763, 0.8956257749604779)]

注意:

  1. You can see from the example output that neighbours with longitudes that differ by about 360° are correctly discovered.

  2. The approach seems fast enough. Here we find neighbours within 30 km for the first thousand cities, taking about 5 seconds:

    >>> from timeit import timeit
    >>> timeit(lambda:tree.query_ball_point(ecef_cities[:1000], r=euclidean_distance(30)), number=1)
    5.013611573027447
    

    Extrapolating, we expect to find neighbours within 30 km for all 50,000 cities in about four minutes.

  3. My euclidean_distance function overestimates the Euclidean distance corresponding to a given great circle distance (so as not to miss any cities). This might be good enough for some applications—after all, cities are not point objects—but if you need more accuracy than this, then you could filter the resulting points using, say, one of the great circle distance functions from geopy.


这是一种解决方法。我已经取得了一些进展,现在我需要将结果与对象匹配。我想我能处理好它。生成的数组是否对应于城市的索引?也就是说,伦敦的结果是:cities [37810]、cities [15755]和cities [16276],是吗? - gwaramadze
是的,没错。文档中说,"如果x是单个点,则返回x的邻居索引列表。如果x是一个点数组,则返回一个形状元组的对象数组,其中包含邻居列表。" - Gareth Rees
谢谢,我想我已经让它工作了。对于单个城市,查找所有匹配项大约需要0.05秒。再加上约15秒的数据库操作(建立关系),我应该在大约15小时内完成。这是完全可以接受的。我还没有彻底测试数据。似乎返回的命中率比以前的算法略高。稍微超过30公里的值也将完全没问题。再次感谢,你真是太棒了。 - gwaramadze
我敢打赌,你也可以通过计算所有城市的关系,然后使用数据库的批量更新工具来加速数据库操作。(一次性进行大规模更新而不是成千上万个小更新。)但如果你需要帮助,请提出另一个问题。 - Gareth Rees
我想做的正是这个问题所问的事情;创建一个比X更接近的位置列表,而且这个答案的代码完美地运行了。有一件事。我注意到SciPy中的KDTree现在有一个query_pairs函数。你传递一个距离,它返回一个索引列表,在这些位置上,它们彼此之间的距离比传递的距离更近。看起来比query_ball_point更直观。这是文档:http://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.KDTree.query_pairs.html#scipy-spatial-kdtree-query-pairs - Jibran

4

如果您知道两个城市之间的距离超过30公里,那么可以通过不输入复杂的三角函数公式来加快距离计算。因为它们的纬度差对应着超过30公里的弧长。长度为a=30公里的弧相应的角度为a/r=0.00470736,因此:

def distance(obj1, obj2):
    lat1 = radians(obj1.latitude)
    lon1 = radians(obj1.longitude)
    lat2 = radians(obj2.latitude)
    lon2 = radians(obj2.longitude)
    dlon = lon2 - lon1
    dlat = lat2 - lat1

    if dlat > 0.00471:
        return 32

    a = (sin(dlat/2))**2 + cos(lat1) * cos(lat2) * (sin(dlon/2))**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    return round(6373.0 * c, 2)

半径32只是一个虚拟值,表示城市间的距离大于30公里。对于经度,您应该应用类似的逻辑,并考虑最大绝对纬度:

    if cos(lat1) * dlon > 0.00471 and cos(lat2) * dlon > 0.00471:
        return 32

如果您知道您的城市处于固定的纬度范围内,那么您可以将常数限制调整到最坏情况。例如,如果您所有的城市都位于美国本土,它们应该在北纬49°以下,然后您的限制就是0.00471 / cos(49°) = 0.00718。

    if dlon > 0.00718:
        return 32

这个更简单的标准意味着您正在为德克萨斯州或佛罗里达州的太多城市输入精确计算。您也可以将这些条件连接起来。先使用近似极限,然后根据最大绝对纬度使用精确极限,接下来计算所有剩余候选者的精确距离。
您可以预先使用最大绝对纬度计算此限制。这种启发式方法还应该帮助您将城市放入具有固定经度和纬度的桶中,就像RemcoGerlich建议的那样。他的方法应该通过事先考虑合理的城市对来显着加快您的过程。 编辑 我有点羞愧地看到我上面的代码没有检查极限的绝对值。无论如何,这里真正的教训是,无论您如何加速距离计算,对于大型数据集,真正的好处来自选择智能搜索机制,如评论者建议的桶搜索或kd树,可能连同一些备忘录以除去双重检查。

我的波兰最北点是54.8358。因此,0.00471 / cos(radians(54.8358)) = 0.00818。我在距离函数中放置了适当的if语句,执行时间几乎没有改变。我一定做错了什么。无论如何,谢谢!我会继续研究它。 - gwaramadze
在将纬度和经度转换为弧度之前,进行合理性检查不是应该可以做到的吗? - Jim Mischel
我用一个较小的波兰城市列表(大约500个城市)再次进行了检查,它确实有所不同,与仅使用纬度标准版本相比提高了约25%,与原始版本相比提高了约50%。输出相同。(但我首先犯了一个复制和粘贴错误:我检查了经度两次,在此之前应该首先检查 dlat,然后是dlon。) - M Oehm
1
@JimMischel 是的,这是可能的。该过程可以通过为每个城市预先计算弧度角度、预先计算纬度余弦值以及用本地值的简单乘法替换 '**' 运算符来进一步加快速度,例如 'slat = sin(0.5*dlat)' 然后 a = slat*slat + ... - M Oehm

3
我会先创建"区块",每个区块由两个纬度之间相隔X公里和两个经度之间相隔X公里的范围限定。X应该尽可能大,但有一个限制条件:每个区块内所有城市之间距离不能超过30公里。

这些区块可以被存储在一个数组中:

Sector[][] sectors;

在这个数组中,很容易找到包含特定坐标的区域。同时,也很容易找到一个特定区域的相邻区域。
然后:
(1)为每个城市分配其所在的区域。每个区域都有一个城市列表。
(2)对于每个城市,找出其所在区域内符合30公里标准的所有城市。
(3)对于每个城市C,在所有8个相邻区域中找到所有城市C'。对于每个C',检查C-C'的距离,并输出C-C'如果它小于30公里。
这个算法仍然是O(n^2),但应该会更快,因为对于每个城市,您只需要检查整个集合的一小部分。

这种方法应该可以加速搜索。问题在于我们只有城市的经纬度,没有“平面”笛卡尔坐标系。一种解决方案可能是扩大相邻条件,这样你只需在赤道附近东西方向上直接查看相邻的区域,但如果靠近极地,你需要查看东西方向上接下来的两三个区域。区域数量取决于纬度。在纬度方向上,我们只需检查直接在南北方向上的区域带。 - M Oehm
@MOehm 没错,我漏掉了那个。另一个解决方案可能是使扇区宽度依赖于纬度。这会稍微复杂化算法,但不应改变计算复杂度。 - Bartosz Klimek

2
  1. 尝试不同的算法来加速单个距离计算。
  2. 只使用城市的排列方式,不重新计算重复项。
  3. 使用multiprocessing模块将工作分配到多个核心上。

1和2很简单。对于第三点,我建议使用imap_unordered()来实现最大速度,并采用以下类似的工作流程:

  • 获取所有排列方式
  • 在主循环中,通过单个数据存储调用加载所有城市模型
  • 将距离计算分布到工作池中
  • 尝试在单个工作线程中或主线程中保存结果。我怀疑在工作线程中更好,但我不知道django如何处理脱机脚本的并发性(如果有人知道,请在评论中添加以便我们集成)。
  • 不管你是在主线程中还是在单个工作线程中保存,都尝试使用事务保存大块的模型。

但是

你还需要修改你的模型。为了实现分布式处理,你需要解除closest_cities变量的耦合。因为不同的进程将更改它。你可以在主进程级别使用一个字典列表存储任何给定城市的所有最近城市作为键,然后在循环结束或同时存储每个模型时将其存储。


我们能把这个转换成(x,y)吗?如果可能的话,我们可以根据x或y轴对它们进行排序,这将限制比较的范围。 - Pham Trung
@PhamTrung 很好的观点...并且正如M Oehm所建议的那样,可以很容易地完成这个过程,只需假设一个30公里弧形的经纬度差值,而无需重新计算。 - Paolo Casciello

0

你正在做大量不必要的工作。

正如其他人建议的那样,你可以通过改变循环结构来限制计算的数量。你有:

for city in cities:
    for tested_city in cities:

因此,您不仅会将每个城市与自身进行比较,还会将city1city2进行比较,稍后又会将city2city1进行比较。

我不是Python程序员,所以无法告诉您在此处使用哪种语法,但您需要的是类似于嵌套循环结构:

for (i = 0; i < cities.Length-1; ++i)
{
    for (j = i+1; j < cities.Length; ++j)
    {
        compare_cities(cities[i], cities[j]);
    }
}

这将减少您需要执行的城市比较数量一半。这将把大约30亿个距离计算减少到大约15亿个。

其他人也提到了在进入昂贵的三角函数之前比较dlatdlong的可能性。

您还可以通过将lat1lon1转换为弧度一次,并且仅计算一次cos(lat1)并将这些值传递给您的距离计算,而不是每次都计算它们来节省一些时间。例如:

for (i = 0; i < cities.Length-1; ++i)
{
    lat1 = radians(cities[i].latitude
    lon1 = radians(cities[i].longitude
    cos1 = cos(lat1)
    for (j = i+1; j < cities.Length; ++j)
    {
        compare_cities(lat1, lon1, cos1, cities[j]);
    }
}

你其实不需要将c转换为千米。例如,你有:

return round(6373.0 * c, 2)

结果必须是<= 30.0。为什么要乘法和四舍五入?你可以直接return c,然后在你的代码中将返回值与0.0047(即30.0/6373)进行比较。


谢谢您的回答。非常好的观点。我在其他地方也使用了距离函数,因此增加了开销。 - gwaramadze

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接