Python中快速计算Pareto前沿

48

我有一组在三维空间中的点,需要找到帕累托前沿。在这里执行速度非常重要,随着我增加测试点的数量,时间会非常快地增加。

这组点看起来像这样:

[[0.3296170319979843, 0.0, 0.44472108843537406], [0.3296170319979843,0.0, 0.44472108843537406], [0.32920760896951373, 0.0, 0.4440408163265306], [0.32920760896951373, 0.0, 0.4440408163265306], [0.33815192743764166, 0.0, 0.44356462585034007]]

现在,我正在使用这个算法:

def dominates(row, candidateRow):
    return sum([row[x] >= candidateRow[x] for x in range(len(row))]) == len(row) 

def simple_cull(inputPoints, dominates):
    paretoPoints = set()
    candidateRowNr = 0
    dominatedPoints = set()
    while True:
        candidateRow = inputPoints[candidateRowNr]
        inputPoints.remove(candidateRow)
        rowNr = 0
        nonDominated = True
        while len(inputPoints) != 0 and rowNr < len(inputPoints):
            row = inputPoints[rowNr]
            if dominates(candidateRow, row):
                # If it is worse on all features remove the row from the array
                inputPoints.remove(row)
                dominatedPoints.add(tuple(row))
            elif dominates(row, candidateRow):
                nonDominated = False
                dominatedPoints.add(tuple(candidateRow))
                rowNr += 1
            else:
                rowNr += 1

        if nonDominated:
            # add the non-dominated point to the Pareto frontier
            paretoPoints.add(tuple(candidateRow))

        if len(inputPoints) == 0:
            break
    return paretoPoints, dominatedPoints

在这里找到: http://code.activestate.com/recipes/578287-multidimensional-pareto-front/

如何快速地在一个解集合中找到非支配解集?换句话说,Python能否比这个算法更好?

7个回答

61

如果你担心运行速度,你肯定希望使用numpy(因为聪明的算法调整可能比使用数组操作带来的收益少得多)。这里有三种解决方案,它们都计算相同的函数。 is_pareto_efficient_dumb解决方案在大多数情况下速度较慢,但随着成本数量的增加而变得更快,is_pareto_efficient_simple解决方案对于许多点要比愚蠢的解决方案更有效率,最终的is_pareto_efficient函数不太易读,但是最快的(因此所有解决方案都是Pareto有效的!)。

import numpy as np


# Very slow for many datapoints.  Fastest for many costs, most readable
def is_pareto_efficient_dumb(costs):
    """
    Find the pareto-efficient points
    :param costs: An (n_points, n_costs) array
    :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient
    """
    is_efficient = np.ones(costs.shape[0], dtype = bool)
    for i, c in enumerate(costs):
        is_efficient[i] = np.all(np.any(costs[:i]>c, axis=1)) and np.all(np.any(costs[i+1:]>c, axis=1))
    return is_efficient


# Fairly fast for many datapoints, less fast for many costs, somewhat readable
def is_pareto_efficient_simple(costs):
    """
    Find the pareto-efficient points
    :param costs: An (n_points, n_costs) array
    :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient
    """
    is_efficient = np.ones(costs.shape[0], dtype = bool)
    for i, c in enumerate(costs):
        if is_efficient[i]:
            is_efficient[is_efficient] = np.any(costs[is_efficient]<c, axis=1)  # Keep any point with a lower cost
            is_efficient[i] = True  # And keep self
    return is_efficient


# Faster than is_pareto_efficient_simple, but less readable.
def is_pareto_efficient(costs, return_mask = True):
    """
    Find the pareto-efficient points
    :param costs: An (n_points, n_costs) array
    :param return_mask: True to return a mask
    :return: An array of indices of pareto-efficient points.
        If return_mask is True, this will be an (n_points, ) boolean array
        Otherwise it will be a (n_efficient_points, ) integer array of indices.
    """
    is_efficient = np.arange(costs.shape[0])
    n_points = costs.shape[0]
    next_point_index = 0  # Next index in the is_efficient array to search for
    while next_point_index<len(costs):
        nondominated_point_mask = np.any(costs<costs[next_point_index], axis=1)
        nondominated_point_mask[next_point_index] = True
        is_efficient = is_efficient[nondominated_point_mask]  # Remove dominated points
        costs = costs[nondominated_point_mask]
        next_point_index = np.sum(nondominated_point_mask[:next_point_index])+1
    if return_mask:
        is_efficient_mask = np.zeros(n_points, dtype = bool)
        is_efficient_mask[is_efficient] = True
        return is_efficient_mask
    else:
        return is_efficient

性能测试(使用从正态分布中绘制的点):

有10,000个样本,2种成本:

is_pareto_efficient_dumb: Elapsed time is 1.586s
is_pareto_efficient_simple: Elapsed time is 0.009653s
is_pareto_efficient: Elapsed time is 0.005479s

拥有1,000,000个样本,2个代价:

is_pareto_efficient_dumb: Really, really, slow
is_pareto_efficient_simple: Elapsed time is 1.174s
is_pareto_efficient: Elapsed time is 0.4033s

有10,000个样本,成本为15:

is_pareto_efficient_dumb: Elapsed time is 4.019s
is_pareto_efficient_simple: Elapsed time is 6.466s
is_pareto_efficient: Elapsed time is 6.41s

请注意,如果效率是一个问题,您可以通过在之前重新排列数据来获得大约2倍左右的速度提升,请参见这里


3
哇,我错过了,谢谢Peter!但是我不太确定成本数组怎么用,你能提供一个简短的例子吗?再次感谢,这看起来太棒了。 - Lucien S.
1
成本数组只是一个二维数组,其中cost[i, j]是第i个数据点的第j个成本。我认为它与您的inputPoints数组相同。您可以在此处查看测试,演示其用法:https://github.com/QUVA-Lab/artemis/blob/master/artemis/general/test_pareto_efficiency.py - Peter
1
我使用了一个简单的例子进行测试,前两个函数没有返回帕累托前沿。 这个例子是: numpy.array([[1,2], [3,4], [2,1], [1,1]]) 它返回以下结果: [ True False True True] 但根据帕累托前沿的定义,它应该返回以下结果: [False False False True] - hyperionb
2
修复 def is_pareto_efficient(costs) 的方法看起来像这样(替换所提到的行):is_efficient[is_efficient] = np.any(costs[is_efficient]<c, axis=1) # 删除支配点 is_efficient[i] = True - hyperionb
2
你能分享一下关于这些算法的科学论文参考文献吗? - w4bo
显示剩余9条评论

17

2019年8月更新

这里有另一种简单的实现方式,适用于较小的维度并且速度相对较快。输入点被假定为唯一的。

def keep_efficient(pts):
    'returns Pareto efficient row subset of pts'
    # sort points by decreasing sum of coordinates
    pts = pts[pts.sum(1).argsort()[::-1]]
    # initialize a boolean mask for undominated points
    # to avoid creating copies each iteration
    undominated = np.ones(pts.shape[0], dtype=bool)
    for i in range(pts.shape[0]):
        # process each point in turn
        n = pts.shape[0]
        if i >= n:
            break
        # find all points not dominated by i
        # since points are sorted by coordinate sum
        # i cannot dominate any points in 1,...,i-1
        undominated[i+1:n] = (pts[i+1:] >= pts[i]).any(1) 
        # keep points undominated so far
        pts = pts[undominated[:n]]
    return pts

我们首先按照坐标之和对点进行排序。这很有用,因为:
- 对于许多数据分布,具有最大坐标和的点将支配大量点。 - 如果点x的坐标和比点y大,则y无法支配x。
以下是与Peter答案相对应的一些基准测试,使用np.random.randn。
N=10000 d=2

keep_efficient
1.31 ms ± 11.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
is_pareto_efficient
6.51 ms ± 23.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


N=10000 d=3

keep_efficient
2.3 ms ± 13.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
is_pareto_efficient
16.4 ms ± 156 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


N=10000 d=4

keep_efficient
4.37 ms ± 38.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
is_pareto_efficient
21.1 ms ± 115 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


N=10000 d=5

keep_efficient
15.1 ms ± 491 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
is_pareto_efficient
110 ms ± 1.01 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


N=10000 d=6

keep_efficient
40.1 ms ± 211 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
is_pareto_efficient
279 ms ± 2.54 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


N=10000 d=15

keep_efficient
3.92 s ± 125 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
is_pareto_efficient
5.88 s ± 74.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

凸包启发式算法

最近我研究了这个问题,并找到了一种有用的启发式算法,如果点分布独立且维度很少,则效果很好。

这个想法是计算点的凸包。对于维度较少且独立分布的点,凸包的顶点数量将很小。直观地说,我们可以期望凸包的一些顶点支配许多原始点。此外,如果凸包中的一个点没有被凸包中的任何其他点支配,那么它也不会被原始集合中的任何点支配。

这给出了一个简单的迭代算法。我们重复执行以下步骤:

  1. 计算凸包。
  2. 保存凸包中未被支配的 Pareto 点。
  3. 过滤点以删除被凸包元素支配的点。

我为三维空间添加了一些基准测试。对于某些点分布情况,该方法似乎产生更好的渐近性。

import numpy as np
from scipy import spatial
from functools import reduce

# test points
pts = np.random.rand(10_000_000, 3)


def filter_(pts, pt):
    """
    Get all points in pts that are not Pareto dominated by the point pt
    """
    weakly_worse   = (pts <= pt).all(axis=-1)
    strictly_worse = (pts < pt).any(axis=-1)
    return pts[~(weakly_worse & strictly_worse)]


def get_pareto_undominated_by(pts1, pts2=None):
    """
    Return all points in pts1 that are not Pareto dominated
    by any points in pts2
    """
    if pts2 is None:
        pts2 = pts1
    return reduce(filter_, pts2, pts1)


def get_pareto_frontier(pts):
    """
    Iteratively filter points based on the convex hull heuristic
    """
    pareto_groups = []

    # loop while there are points remaining
    while pts.shape[0]:
        # brute force if there are few points:
        if pts.shape[0] < 10:
            pareto_groups.append(get_pareto_undominated_by(pts))
            break

        # compute vertices of the convex hull
        hull_vertices = spatial.ConvexHull(pts).vertices

        # get corresponding points
        hull_pts = pts[hull_vertices]

        # get points in pts that are not convex hull vertices
        nonhull_mask = np.ones(pts.shape[0], dtype=bool)
        nonhull_mask[hull_vertices] = False
        pts = pts[nonhull_mask]

        # get points in the convex hull that are on the Pareto frontier
        pareto   = get_pareto_undominated_by(hull_pts)
        pareto_groups.append(pareto)

        # filter remaining points to keep those not dominated by
        # Pareto points of the convex hull
        pts = get_pareto_undominated_by(pts, pareto)

    return np.vstack(pareto_groups)

# --------------------------------------------------------------------------------
# previous solutions
# --------------------------------------------------------------------------------

def is_pareto_efficient_dumb(costs):
    """
    :param costs: An (n_points, n_costs) array
    :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient
    """
    is_efficient = np.ones(costs.shape[0], dtype = bool)
    for i, c in enumerate(costs):
        is_efficient[i] = np.all(np.any(costs>=c, axis=1))
    return is_efficient


def is_pareto_efficient(costs):
    """
    :param costs: An (n_points, n_costs) array
    :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient
    """
    is_efficient = np.ones(costs.shape[0], dtype = bool)
    for i, c in enumerate(costs):
        if is_efficient[i]:
            is_efficient[is_efficient] = np.any(costs[is_efficient]<=c, axis=1)  # Remove dominated points
    return is_efficient


def dominates(row, rowCandidate):
    return all(r >= rc for r, rc in zip(row, rowCandidate))


def cull(pts, dominates):
    dominated = []
    cleared = []
    remaining = pts
    while remaining:
        candidate = remaining[0]
        new_remaining = []
        for other in remaining[1:]:
            [new_remaining, dominated][dominates(candidate, other)].append(other)
        if not any(dominates(other, candidate) for other in new_remaining):
            cleared.append(candidate)
        else:
            dominated.append(candidate)
        remaining = new_remaining
    return cleared, dominated

# --------------------------------------------------------------------------------
# benchmarking
# --------------------------------------------------------------------------------

# to accomodate the original non-numpy solution
pts_list = [list(pt) for pt in pts]

import timeit

# print('Old non-numpy solution:s\t{}'.format(
    # timeit.timeit('cull(pts_list, dominates)', number=3, globals=globals())))

print('Numpy solution:\t{}'.format(
    timeit.timeit('is_pareto_efficient(pts)', number=3, globals=globals())))

print('Convex hull heuristic:\t{}'.format(
    timeit.timeit('get_pareto_frontier(pts)', number=3, globals=globals())))

结果

# >>= python temp.py # 1,000 points
# Old non-numpy solution:      0.0316428339574486
# Numpy solution:              0.005961259012110531
# Convex hull heuristic:       0.012369581032544374
# >>= python temp.py # 1,000,000 points
# Old non-numpy solution:      70.67529802105855
# Numpy solution:              5.398462114972062
# Convex hull heuristic:       1.5286884519737214
# >>= python temp.py # 10,000,000 points
# Numpy solution:              98.03680767398328
# Convex hull heuristic:       10.203076395904645

原始帖子

我尝试用一些调整来重新编写相同的算法。我认为你遇到的大部分问题来自于inputPoints.remove(row)。这需要搜索点列表;通过索引删除将更加高效。 我还修改了dominates函数,以避免一些冗余比较。在更高的维度中,这可能会很方便。

def dominates(row, rowCandidate):
    return all(r >= rc for r, rc in zip(row, rowCandidate))

def cull(pts, dominates):
    dominated = []
    cleared = []
    remaining = pts
    while remaining:
        candidate = remaining[0]
        new_remaining = []
        for other in remaining[1:]:
            [new_remaining, dominated][dominates(candidate, other)].append(other)
        if not any(dominates(other, candidate) for other in new_remaining):
            cleared.append(candidate)
        else:
            dominated.append(candidate)
        remaining = new_remaining
    return cleared, dominated

谢谢,我现在正在尝试。您知道它与此处第一个答案相比如何:https://dev59.com/4Hvaa4cB1Zd3GeqPFqBN? - Lucien S.
谢谢您的回答。知道如何选择作为Pareto前沿的点集中最佳点也是很好的。此外,可能有一些需要最小化,一些需要最大化,还有一些需要保持不变的目标。 - Nav
@Nav 我不确定我完全理解您的建议。对于Pareto前沿中的每个点,都可以编写一种目标函数,以便该点是“最佳的”。您是否考虑多目标优化?这不是我为此答案所考虑的内容。 - hilberts_drinking_problem
3
2019年8月更新的代码似乎无法正常工作 - 返回的集合不是有效前沿。例如,对于此集合(https://gist.github.com/vfilimonov/4e368a7a6235ae2aff8e9c47a569e974),帕累托前沿中缺少许多点。 - Vladimir
@Vladimir,干得好。看起来问题与测试无支配点时的严格不等式有关。当两个点具有某些坐标相同的值时,这会导致错误,就像您的示例一样。请注意,is_pareto_efficientkeep_efficient的更新输出将不同,因为点[4.8, 4.8]出现了两次,我的代码无法处理重复项。 - hilberts_drinking_problem
显示剩余2条评论

4

Peter,回答得很好。

我只是想为那些想在最大化和你默认的最小化之间进行选择的人概括一下。这是一个微不足道的修复,但在这里记录下来很好:

def is_pareto(costs, maximise=False):
    """
    :param costs: An (n_points, n_costs) array
    :maximise: boolean. True for maximising, False for minimising
    :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient
    """
    is_efficient = np.ones(costs.shape[0], dtype = bool)
    for i, c in enumerate(costs):
        if is_efficient[i]:
            if maximise:
                is_efficient[is_efficient] = np.any(costs[is_efficient]>=c, axis=1)  # Remove dominated points
            else:
                is_efficient[is_efficient] = np.any(costs[is_efficient]<=c, axis=1)  # Remove dominated points
    return is_efficient

3
< p > dominates 的定义不正确。当且仅当某个对象在所有维度上优于或等于另一个对象,并且至少在一个维度上严格优于另一个对象时,我们称该对象支配另一个对象。


2

我可能有点晚了,但我尝试了提出的解决方案,似乎它们无法返回所有的Pareto点。我做了一个递归实现(速度明显更快)来查找Pareto前沿,你可以在https://github.com/Ragheb2464/preto-front找到它。


请提供一个反例。 - kilojoules
不错!你的解决方案对我有用,其他所有的都没有(我不知道如何检索成本数组)。 - Hugo de Heer

1

在之前的帖子中发现了一个错误,这里是 keep_efficient 函数的新版本。

def keep_efficient(pts):
    'returns Pareto efficient row subset of pts'
    # sort points by decreasing sum of coordinates
    pts = pts[pts.sum(1).argsort()[::-1]]
    # initialize a boolean mask for undominated points
    # to avoid creating copies each iteration
    undominated = np.ones(pts.shape[0], dtype=bool)
    for i in range(pts.shape[0]):
        # process each point in turn
        n = pts.shape[0]
        if i >= n:
            break
        # find all points not dominated by i
        # since points are sorted by coordinate sum
        # i cannot dominate any points in 1,...,i-1
        undominated[i+1:n] = (pts[i+1:] >= pts[i]).any(1) 
        # keep points undominated so far
        pts = pts[undominated[:n]]
        undominated = np.array([True]*len(pts))

return pts

注意:前一篇文章中的错误在于函数keep_efficient(pts)以输入pts = [[5,5],[4,3], [0,6]]返回了错误的Pareto前沿。在编辑之前,结果为[5,5],而期望的结果是[[5 5], [0 6]]。修复方法是在for循环的最后添加以下代码:undominated = np.array([True]*len(pts)))

0

仅为了清楚起见,上面的示例中获取Pareto前沿的函数与上面的代码略有不同,应该只包含一个<而不是<=,看起来像这样:

def is_pareto(costs):
    is_efficient = np.ones(costs.shape[0], dtype=bool)

    for i, c in enumerate(is_efficient):
        if is_efficient[i]:
           is_efficient[is_efficient] = np.any(costs[is_efficient]<c, axis=1) 

    return is_efficient

免责声明:这只是部分正确,因为支配本身被定义为对于所有人都是“<=”,而对于至少一个人则是“<”。但对于大多数情况来说,这应该已经足够了。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接