生成过滤的二进制笛卡尔积。

12

问题陈述

我正在寻找一种高效的方法来生成完整的二进制笛卡尔积(包含所有True和False组合的表格,具有特定列数),并通过某些排除条件进行过滤。例如,对于三列/位 n=3,我们将得到完整的表格。

df_combs = pd.DataFrame(itertools.product(*([[True, False]] * n)))
       0      1      2
0   True   True   True
1   True   True  False
2   True  False   True
3   True  False  False
...

这应该由定义互斥组合的字典来过滤,如下所示:

mutually_excl = [{0: False, 1: False, 2: True},
                 {0: True, 2: True}]

这里的键表示上面表格中的列。例子可以理解为:

  • 如果0是False,1也是False,那么2就不能是True
  • 如果0是True,那么2就不能是True

基于这些过滤条件,预期输出结果为:

       0      1      2
1   True   True  False
3   True  False  False
4  False   True   True
5  False   True  False
7  False  False  False

在我的使用情况中,筛选后的表格比完整的笛卡尔积小几个数量级(例如,大约1000而不是2 ** 24(16777216))。
以下是我目前的三种解决方案,每种都有其优缺点,在最后进行讨论。
import random
import pandas as pd
import itertools
import wrapt
import time
import operator
import functools

def get_mutually_excl(n, nfilt):  # generate random example filter
    ''' Example: `get_mutually_excl(9, 2)` creates a list of two filters with
    maximum index `n=9` and each filter length between 2 and `int(n/3)`:
    `[{1: True, 2: False}, {3: False, 2: True, 6: False}]` '''
    random.seed(2)
    return [{random.choice(range(n)): random.choice([True, False])
                           for _ in range(random.randint(2, int(n/3)))}
                           for _ in range(nfilt)]

@wrapt.decorator
def timediff(f, _, args, kwargs):
    t = time.perf_counter()
    res = f(*args)
    return res, time.perf_counter() - t

解决方案1:先过滤再合并。

将每个单一的过滤条目(例如{0: True, 2: True})扩展为一个子表,其中列对应于此过滤条目中的索引([0, 2])。从这个子表中删除单个过滤行([True, True])。与完整表合并以获取过滤后组合的完整列表。

@timediff
def make_df_comb_filt_merge(n, nfilt):

    mutually_excl = get_mutually_excl(n, nfilt)

    # determine missing (unfiltered) columns
    cols_missing = set(range(n)) - set(itertools.chain.from_iterable(mutually_excl))

    # complete dataframe of unfiltered columns with column "temp" for full outer merge
    df_comb = pd.DataFrame(itertools.product(*([[True, False]] * len(cols_missing))),
                            columns=cols_missing).assign(temp=1)

    for filt in mutually_excl:  # loop through individual filters

        # get columns and bool values of this filters as two tuples with same order
        list_col, list_bool = zip(*filt.items())

        # construct dataframe
        df = pd.DataFrame(itertools.product(*([[True, False]] * len(list_col))),
                                columns=list_col)

        # filter remove a *single* row (by definition)
        df = df.loc[df.apply(tuple, axis=1) != list_bool]

        # determine which rows to merge on
        merge_cols = list(set(df.columns) & set(df_comb.columns))
        if not merge_cols:
            merge_cols = ['temp']
            df['temp'] = 1

        # merge with full dataframe
        df_comb = pd.merge(df_comb, df, on=merge_cols)

    df_comb.drop('temp', axis=1, inplace=True)
    df_comb = df_comb[range(n)]
    df_comb = df_comb.sort_values(df_comb.columns.tolist(), ascending=False)

    return df_comb.reset_index(drop=True)

解决方案2:完全展开,然后过滤

生成完整笛卡尔积的DataFrame:整个过程将在内存中完成。循环遍历过滤器并为每个过滤器创建掩码。将每个掩码应用于表格。


@timediff
def make_df_comb_exp_filt(n, nfilt):

    mutually_excl = get_mutually_excl(n, nfilt)

    # expand all bool combinations into dataframe
    df_comb = pd.DataFrame(itertools.product(*([[True, False]] * n)),
                           dtype=bool)

    for filt in mutually_excl:

        # generate total filter mask for given excluded combination
        mask = pd.Series(True, index=df_comb.index)
        for col, bool_act in filt.items():
            mask = mask & (df_comb[col] == bool_act)

        # filter dataframe
        df_comb = df_comb.loc[~mask]

    return df_comb.reset_index(drop=True)

解决方案3:筛选迭代器

将完整的笛卡尔积保留为迭代器。循环遍历每一行时,检查该行是否被任何一个筛选器排除。

@timediff
def make_df_iter_filt(n, nfilt):

    mutually_excl = get_mutually_excl(n, nfilt)

    # switch to [[(1, 13), (True, False)], [(4, 9), (False, True)], ...]
    mutually_excl_index = [list(zip(*comb.items()))
                                for comb in mutually_excl]

    # create iterator
    combs_iter = itertools.product(*([[True, False]] * n))

    @functools.lru_cache(maxsize=1024, typed=True)  # small benefit
    def get_getter(list_):
        # Used to access combs_iter row values as indexed by the filter
        return operator.itemgetter(*list_)

    def check_comb(comb_inp, comb_check):
        return get_getter(comb_check[0])(comb_inp) == comb_check[1]

    # loop through the iterator
    # drop row if any of the filter matches
    df_comb = pd.DataFrame([comb_inp for comb_inp in combs_iter
                       if not any(check_comb(comb_inp, comb_check)
                                  for comb_check in mutually_excl_index)])

    return df_comb.reset_index(drop=True)

运行示例

dict_time = dict.fromkeys(itertools.product(range(16, 23, 2), range(3, 20)))

for n, nfilt in dict_time:
    dict_time[(n, nfilt)] = {'exp_filt': make_df_comb_exp_filt(n, nfilt)[1],
                             'filt_merge': make_df_comb_filt_merge(n, nfilt)[1],
                             'iter_filt': make_df_iter_filt(n, nfilt)[1]}

分析

import seaborn as sns
import matplotlib.pyplot as plt

df_time = pd.DataFrame.from_dict(dict_time, orient='index',
                                 ).rename_axis(["n", "nfilt"]
                                 ).stack().reset_index().rename(columns={'level_2': 'solution', 0: 'time'})

g = sns.FacetGrid(df_time.query('n in %s' % str([16,18,20,22])),
                  col="n",  hue="solution", sharey=False)
g = (g.map(plt.plot, "nfilt", "time", marker="o").add_legend())

enter image description here

方案3: 基于迭代器的方法(comb_iterator)运行时间差,但没有明显的内存使用。我觉得还有改进的空间,尽管不可避免的循环可能会在运行时间方面施加严格限制。 方案2: 将完整的笛卡尔积扩展为DataFrame(exp_filt)会导致内存显著增加,我希望避免这种情况。运行时间还可以接受。 方案1: 合并从各个过滤器创建的数据框(filt_merge)似乎是我的实际应用的好解决方案(请注意,随着过滤器数量的增加,运行时间减少,这是由于较小的cols_missing表所导致的)。然而,这种方法并不完全令人满意:如果单个过滤器包括所有列,则整个笛卡尔积(2**n)将最终出现在内存中,使此解决方案比comb_iterator更糟糕。

问题:还有其他想法吗?一个疯狂而聪明的numpy两行代码?迭代器方法是否可以得到改进?


1
约束求解器可能会比这些方法表现更好,因为它们通过减少搜索空间来找到这些解决方案。也许可以看一下or-tools。这里有一个SAT的例子。 - ayhan
1
@ayhan,我尝试了(见答案)。这是一个有趣的方法,但不适用于一般解决方案。感谢您的贡献。我学到了东西 :) - mcsoini
这听起来像是一个可满足性问题,所以如果问题足够大,你应该一定要使用求解器。你也可以尝试https://or.stackexchange.com。 - Stradivari
@Stradivari公式作为一个SAT问题的表述确实是有意义的。但我不喜欢这种方法对滤波器数量的强依赖性。可能是我没有正确地访问解决方案。既然你熟悉or-tools,也许你想看看我的相应问题...它仍然缺少一个被接受的答案 ;) - mcsoini
2个回答

1

根据@ayhan的评论,我实现了一个基于or-tools的SAT解决方案。虽然这个想法很好,但是对于更多的二进制变量来说会遇到困难。我怀疑这与大规模IP问题类似,这也不是一件容易的事情。然而,对过滤器数量的强依赖性可能使其成为某些参数配置的有效选项。但作为一般解决方案,我不会使用它。

from ortools.sat.python import cp_model

class VarArraySolutionCollector(cp_model.CpSolverSolutionCallback):

    def __init__(self, variables):
        cp_model.CpSolverSolutionCallback.__init__(self)
        self.__variables = variables
        self.solution_list = []

    def on_solution_callback(self):
        self.solution_list.append([self.Value(v) for v in self.__variables])


@timediff
def make_df_comb_sat(n, nfilt):

    mutually_excl = get_mutually_excl(n, nfilt)

    model = cp_model.CpModel()

    make_var_name = 'x{:02d}'.format
    vrs = dict.fromkeys(map(make_var_name, range(n)))
    for var_name in vrs:
        vrs[var_name] = model.NewBoolVar(var_name)

    for filt in mutually_excl:
        list_expr = [vrs[make_var_name(iv)]
                     if not bool_ else getattr(vrs[make_var_name(iv)], 'Not')()
                     for iv, bool_ in filt.items()]
        model.AddBoolOr(list_expr)

    solver = cp_model.CpSolver()
    solution_printer = VarArraySolutionCollector(vrs.values())
    solver.SearchForAllSolutions(model, solution_printer)

    df_comb = pd.DataFrame(solution_printer.solution_list).astype(bool)
    df_comb = df_comb.sort_values(df_comb.columns.tolist(), ascending=False)
    df_comb = df_comb.reset_index(drop=True)

    return df_comb

enter image description here


1
尝试计时以下内容:
def in_filter(arr, arr_filt, n):
    return ((arr[:, None] >> (n-1-arr_filt[:, 0])) & 1 == arr_filt[:, 1]).all(axis=1)

def bits_to_boolean(arr, n):
    return ((arr[:, None] >> np.arange(n, dtype=arr.dtype)[::-1]) & 1).astype(bool)

@timediff
def recursive_filter(n, nfilt, dtype='uint32'):
    filts = get_mutually_excl(n, nfilt)
    out = np.arange(2**n, dtype=dtype)
    for filt in filts:
        arr_filt = np.array(list(filt.items()))
        out = out[~in_filter(out, arr_filt, n)]
    return bits_to_boolean(out, n)[::-1]

它将笛卡尔积视为编码在整数范围内的位,使用向量化函数递归地删除具有与给定过滤器匹配的位序列的数字。0..<2 ** n。相比于分配所有[True,False]笛卡尔积,内存效率更高,因为每个布尔值至少会存储8位(使用了比所需多7位),但是它将使用比基于迭代器的方法更多的内存。如果您需要解决大型n的问题,可以通过一次分配和操作一个子范围来拆分此任务。我在第一次实现中就这样做了,但对于n<=22没有太多好处,而且需要计算输出数组的大小,当存在重叠过滤器时会变得复杂。

这真是太棒了! - mcsoini

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接