在C++中执行向量交集

5

我有一个无符号整数的向量向量。我需要找到所有这些无符号整数向量的交集,为此我编写了以下代码:

int func()
{
   vector<vector<unsigned> > t;
   vector<unsigned> intersectedValues;
   bool firstIntersection=true;
   for(int i=0;i<(t).size();i++)
   {
       if(firstIntersection)
       {
           intersectedValues=t[0];
           firstIntersection=false;
       }else{
           vector<unsigned> tempIntersectedSubjects;                                                              
           set_intersection(t[i].begin(),
                  t[i].end(), intersectedValues.begin(),
                  intersectedValues.end(),
                  std::inserter(tempIntersectedSubjects, tempIntersectedSubjects.begin()));
           intersectedValues=tempIntersectedSubjects;
       }         
       if(intersectedValues.size()==0)
           break;
   }               
}

每个向量都有9000个元素,在“t”中有很多这样的向量。当我对我的代码进行分析时,发现set_intersection占用了最长的时间,因此在func()被多次调用时会导致代码变慢。请问有人可以建议如何使代码更加有效率吗?
我正在使用:gcc(GCC)4.8.2 20140120(Red Hat 4.8.2-15)
编辑:vector“t”中的每个向量都已排序。

你的向量已经排序了吗?此外,你可以尝试将找到的元素插入到末尾而不是开头。 - Galik
@Galik 是的,向量已经排序。 - Steg Verner
你知道交集的大小吗?如果预计交集很“小”,那么算法可能会改变。 - Mark Lakata
@StegVerner 您列出了编译器,但您没有发布所使用的编译器选项。如果您正在对未经优化的代码进行分析,则在构建程序的优化版本之前,您的结果是没有意义的。 - PaulMcKenzie
@PaulMcKenzie 我正在使用的编译器选项是O3。 - Steg Verner
5个回答

3

我没有一个用于分析操作的框架,但我肯定会更改代码,以重复使用已分配的向量。此外,我会将初始交集移出循环。另外,std :: back_inserter()应该确保在正确位置添加元素而不是在开头:

int func()
{
    vector<vector<unsigned> > t = some_initialization();
    if (t.empty()) {
        return;
    }
    vector<unsigned> intersectedValues(t[0]);
    vector<unsigned> tempIntersectedSubjects;
    for (std::vector<std::vector<unsigned>>::size_type i(1u);
         i < t.size() && !intersectedValues.empty(); ++i) {
        std::set_intersection(t[i].begin(), t[i].end(),
                              intersectedValues.begin(), intersectedValues.end(),
                             std::back_inserter(tempIntersectedSubjects);
        std::swap(intersectedValues, tempIntersectedSubjects);
        tempIntersectedSubjects.clear();
    }
}               

我认为这段代码有很大可能性更快。另外,交集的方式也可以不同:不是保留一个集合并与之相交,而是对相邻的集合创建新的交集,然后将第一个集合与它们各自相邻的集合相交:

std::vector<std::vector<unsigned>> intersections(
    std::vector<std::vector<unsigned>> const& t) {
    std::vector<std::vector<unsigned>> r;
    std::vector<std::vector<unsignned>>::size_type i(0);
    for (; i + 1 < t.size(); i += 2) {
        r.push_back(intersect(t[i], t[i + 1]));
    }
    if (i < t.size()) {
        r.push_back(t[i]);
    }
    return r;
}

std::vector<unsigned> func(std::vector<std::vector<unsigned>> const& t) {
    if (t.empty()) { /* deal with t being empty... */ }
    std::vector<std::vector<unsigned>> r(intersections(t))
    return r.size() == 1? r[0]: func(r);
}

当然,实际上你不会像这样实现它: 你会使用 Stepanov 的二进制计数器来保持中间集合。这种方法假设结果很可能非空。如果期望结果为空,则可能不是一种改进。

根据 OP,for 循环内部似乎需要重新初始化 intersectedValues 变量。 - Steephen

2
我无法进行测试,但或许像这样做会更快一些?
int func()
{
   vector<vector<unsigned> > t;
   vector<unsigned> intersectedValues;

   // remove if() branching from loop

   if(t.empty())
       return -1;

   intersectedValues = t[0];

   // now start from 1
   for(size_t i = 1; i < t.size(); ++i)
   {
       vector<unsigned> tempIntersectedSubjects;
       tempIntersectedSubjects.reserve(intersectedValues.size()); // pre-allocate

       // insert at end() not begin()
       set_intersection(t[i].begin(),
              t[i].end(), intersectedValues.begin(),
              intersectedValues.end(),
              std::inserter(tempIntersectedSubjects, tempIntersectedSubjects.end()));

       // as these are not used again you can move them rather than copy
       intersectedValues = std::move(tempIntersectedSubjects);

       if(intersectedValues.empty())
           break;
   }
   return 0;
}

另一种可能性:

考虑使用swap()可以优化数据交换并消除重新分配的需要。此外,临时构造函数可以移出循环。

int func()
{
   vector<vector<unsigned> > t;
   vector<unsigned> intersectedValues;

   // remove if() branching from loop

   if(t.empty())
       return -1;

   intersectedValues = t[0];

   // no need to construct this every loop
   vector<unsigned> tempIntersectedSubjects;

   // now start from 1
   for(size_t i = 1; i < t.size(); ++i)
   {
       // should already be the correct size from previous loop
       // but just in case this should be cheep
       // (profile removing this line)
       tempIntersectedSubjects.reserve(intersectedValues.size());

       // insert at end() not begin()
       set_intersection(t[i].begin(),
              t[i].end(), intersectedValues.begin(),
              intersectedValues.end(),
              std::inserter(tempIntersectedSubjects, tempIntersectedSubjects.end()));

       // swap should leave tempIntersectedSubjects preallocated to the
       // correct size
       intersectedValues.swap(tempIntersectedSubjects);
       tempIntersectedSubjects.clear(); // will not deallocate

       if(intersectedValues.empty())
           break;
   }
   return 0;
}

默认情况下,std::vector是否调用移动赋值运算符?我们需要调用std::move()吗? - Steephen
1
@Steephen 不会,因为tempIntersectedSubjects是一个左值(它是一个命名变量)。移动语义只对编译器知道永远不会再次使用的未命名临时对象自动执行。 - Galik
1
@Steephen 我有一个新想法 - 使用 swap() 可能会更有效(已添加到答案中) - Galik
简单的for循环使用emplace_back()比swap更好,我在我的代码中观察到了这一点! - Steephen
@Steephen 真的吗?我认为 swap()O(1) - 简单交换数据指针? - Galik

2

您可以通过在编译时定义_GLIBCXX_PARALLEL来使std::set_intersection以及其他许多标准库算法并行运行。这可能具有最佳的工作效益比。有关文档,请参见此处

必要的陷阱警告:

请注意,定义_GLIBCXX_PARALLEL可能会更改诸如std::search之类的标准类模板的大小和行为,因此只能在两个翻译单元之间传递未实例化的容器,才能同时链接使用并行模式和不使用并行模式的代码。并行模式功能具有不同的链接关系,不能与正常模式符号混淆。

来自这里

另一个简单但可能微不足道的优化是在填充向量之前保留足够的空间。

此外,请尝试确定是否将值插入到后面而不是前面,然后将向量翻转是否有帮助。(虽然我甚至认为您的代码现在是错误的,您的intersectedValues排序方式不正确。如果我没弄错的话,您应该使用std::back_inserter而不是std::inserter(...,begin),然后不要反转。)虽然通过内存移动数据很快,但不需要移动更快。


我在哪里定义_GLIBCXX_PARALLEL?我是新手,请您能帮我解决一下吗? - Steg Verner
@StegVerner 使用-D_GLIBCXX_PARALLEL编译。 - Baum mit Augen
我正在使用NetBeans,如何指示NetBeans使用此选项? - Steg Verner
@StegVerner 我不知道,我从未使用过它。阅读文档或谷歌一下,如果它是一个像样的 IDE,应该有类似于“构建选项”的东西,您可以在其中添加定义。 - Baum mit Augen
+1 用于预留向量内存。你知道 tempIntersectedSubjects 将小于或等于 intersectedValues - Mark Lakata

1
使用emplace_back()在for循环中从向量复制元素可以节省时间。如果更改for循环的迭代器索引,则无需标志。因此,可以优化for循环,并且可以删除每次迭代的条件检查。
void func()
{
   vector<vector<unsigned > > t;
   vector<unsigned int > intersectedValues;

   for(unsigned int i=1;i<(t).size();i++)
   {
        intersectedValues=t[0];

        vector<unsigned > tempIntersectedSubjects;                                                              
        set_intersection(t[i].begin(),
                  t[i].end(), intersectedValues.begin(),
                  intersectedValues.end(),
                   std::back_inserter(tempIntersectedSubjects);
        for(auto &ele: tempIntersectedSubjects)
                intersectedValues.emplace_back(ele);

        if( intersectedValues.empty())
          break;
   }               
}

1

set::set_intersection对于大向量来说可能会比较慢。可以使用lower_bound创建类似的函数。像这样:

template<typename Iterator1, typename Iterator2, typename Function>
void lower_bound_intersection(Iterator1 begin_1, Iterator1 end_1, Iterator2 begin_2, Iterator2 end_2, Function func)
{
    for (; begin_1 != end_1 && begin_2 != end_2;)
    {
        if (*begin_1 < *begin_2)
        {
            begin_1 = begin_1.lower_bound(*begin_2);
            //++begin_1;
        }
        else if (*begin_2 < *begin_1)
        {
            begin_2 = begin_2.lower_bound(*begin_1);
            //++begin_2;
        }
        else // equivalent
        {
            func(*begin_1);
            ++begin_1;
            ++begin_2;
        }
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接