合并向量的算法求助

3
我需要一个非常快的算法来完成以下任务。我已经实现了几个算法来完成它,但它们对于我所需的性能来说都太慢了。它应该足够快,以至于算法可以在现代CPU上每秒运行至少100,000次。它将在C++中实现。
我正在使用跨度/范围,这是一种具有线路上起始和结束坐标的结构。
我有两个跨度向量(动态数组),我需要合并它们。一个向量是src,另一个是dst。向量按跨度起始坐标排序,并且跨度在一个向量内不重叠。
src向量中的跨度必须与dst向量中的跨度合并,使得结果向量仍然排序并且没有重叠。即。如果在合并期间检测到重叠,则将两个跨度合并为一个。(合并两个跨度只是改变结构中的坐标。)
现在,还有一个问题,src向量中的跨度在合并期间必须“加宽”。这意味着会向src中的每个跨度的起始点添加一个常数,向其结束点添加另一个(更大的)常数。这意味着在src跨度拓宽后,它们可能会重叠。
到目前为止,我得出的结论是它不能完全在原地完成,需要某种临时存储。我认为它应该能够在线性时间内完成src和dst元素数量之和。
任何临时存储都可能在多次运行算法之间共享。
我尝试的两种主要方法速度都太慢:
1.将src的所有元素附加到dst中,在附加之前拓宽每个元素。然后运行一个原地排序。最后使用“读”和“写”指针迭代结果向量,其中读指针在写指针之前运行,合并跨度。当所有元素都已合并(读指针到达末尾)时,截断dst。
2.创建一个临时工作向量。通过重复从src或dst选择下一个元素并合并到工作向量中来执行简单合并。完成后,将工作向量复制到dst中,替换它。
第一种方法的问题在于排序是O((m + n)* log(m + n)),而不是O(m + n),并且具有一定的开销。这也意味着dst向量必须比实际需要的要大得多。
第二种方法的主要问题是大量的复制和再次分配/释放内存。
如果您认为需要,可以更改用于存储/管理跨度/向量的数据结构。
更新:忘记说数据集有多大。最常见的情况是每个向量中有4到30个元素,dst为空或src和dst中的跨度之间存在大量重叠。
9个回答

8
我们知道最佳情况下的运行时间是O(m+n),这是因为你至少需要扫描所有的数据才能合并列表。鉴于此,你的第二种方法应该给你那种类型的行为。
你有没有分析过你的第二种方法,找出瓶颈在哪里?根据你所谈论的数据量,很可能实际上无法在指定的时间内完成所需操作。一种验证的方法是做一些简单的事情,例如在循环中对每个向量中的跨度的开始值和结束值进行求和,并计时。基本上,在向量的每个元素上执行了最小量的工作。这将为你提供可以期望获得的最佳性能的基线。
此外,你可以通过使用STL交换方法避免逐个复制向量元素,并预先分配临时向量到一定大小,以避免在合并元素时触发数组扩展。
你可以考虑在系统中使用两个向量,每当需要进行合并时,都合并到未使用的向量中,然后交换(这类似于图形中使用的双缓冲)。这样,每次进行合并时就不必重新分配向量。
然而,你最好先进行分析,找出瓶颈在哪里。如果分配与实际合并过程相比较小,那么你需要找出如何使其更快。
一些可能的额外加速可以通过直接访问向量的原始数据来实现,这避免了对每个访问数据执行的边界检查。

谢谢你提醒我使用std::swap,这可能是决定性因素。测试后我会回来的 ;) - jfs

0
第二种方法如何?它不需要重复分配内存——也就是说,只需分配一次临时向量,以后就不用再分配了。或者,如果输入向量足够小(但大小不固定),可以使用alloca而不是malloc。
此外,在速度方面,您可能希望确保您的代码在排序时使用CMOV,因为如果代码实际上在每个归并排序迭代中都进行分支操作:
if(src1[x] < src2[x])
    dst[x] = src1[x];
else
    dst[x] = src2[x];

分支预测有50%的失败率,这会对性能产生巨大影响。条件移动可能会更好,因此请确保编译器正在执行此操作,如果没有,请尝试劝说它这样做。

0

我为这个算法编写了一个新的容器类,以满足需求。这也给了我一个机会来调整程序中的其他代码,同时也提高了一些速度。

这比使用STL向量的旧实现要快得多,但基本上是相同的东西。但尽管它更快,但仍然不够快...不幸的是。

分析已经不能再揭示真正的瓶颈了。MSVC分析器有时似乎会将“责任”归咎于错误的调用(据说相同的运行分配了广泛不同的运行时间),并且大多数调用都被合并成一个大块。

查看生成的代码的反汇编显示,生成的代码中有大量的跳转,我认为这可能是现在缓慢的主要原因。

class SpanBuffer {
private:
    int *data;
    size_t allocated_size;
    size_t count;

    inline void EnsureSpace()
    {
        if (count == allocated_size)
            Reserve(count*2);
    }

public:
    struct Span {
        int start, end;
    };

public:
    SpanBuffer()
        : data(0)
        , allocated_size(24)
        , count(0)
    {
        data = new int[allocated_size];
    }

    SpanBuffer(const SpanBuffer &src)
        : data(0)
        , allocated_size(src.allocated_size)
        , count(src.count)
    {
        data = new int[allocated_size];
        memcpy(data, src.data, sizeof(int)*count);
    }

    ~SpanBuffer()
    {
        delete [] data;
    }

    inline void AddIntersection(int x)
    {
        EnsureSpace();
        data[count++] = x;
    }

    inline void AddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);
        EnsureSpace();
        data[count] = s;
        data[count+1] = e;
        count += 2;
    }

    inline void Clear()
    {
        count = 0;
    }

    inline size_t GetCount() const
    {
        return count;
    }

    inline int GetIntersection(size_t i) const
    {
        return data[i];
    }

    inline const Span * GetSpanIteratorBegin() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data);
    }

    inline Span * GetSpanIteratorBegin()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data);
    }

    inline const Span * GetSpanIteratorEnd() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data+count);
    }

    inline Span * GetSpanIteratorEnd()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data+count);
    }

    inline void MergeOrAddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);

        if (count == 0)
        {
            AddSpan(s, e);
            return;
        }

        int *lastspan = data + count-2;

        if (s > lastspan[1])
        {
            AddSpan(s, e);
        }
        else
        {
            if (s < lastspan[0])
                lastspan[0] = s;
            if (e > lastspan[1])
                lastspan[1] = e;
        }
    }

    inline void Reserve(size_t minsize)
    {
        if (minsize <= allocated_size)
            return;

        int *newdata = new int[minsize];

        memcpy(newdata, data, sizeof(int)*count);

        delete [] data;
        data = newdata;

        allocated_size = minsize;
    }

    inline void SortIntersections()
    {
        assert((count & 1) == 0);
        std::sort(data, data+count, std::less<int>());
        assert((count & 1) == 0);
    }

    inline void Swap(SpanBuffer &other)
    {
        std::swap(data, other.data);
        std::swap(allocated_size, other.allocated_size);
        std::swap(count, other.count);
    }
};


struct ShapeWidener {
    // How much to widen in the X direction
    int widen_by;
    // Half of width difference of src and dst (width of the border being produced)
    int xofs;

    // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call
    SpanBuffer buffer;

    inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst);

    ShapeWidener(int _xofs) : xofs(_xofs) { }
};


inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst)
{
    if (src.GetCount() == 0) return;
    if (src.GetCount() + dst.GetCount() == 0) return;

    assert((src.GetCount() & 1) == 0);
    assert((dst.GetCount() & 1) == 0);

    assert(buffer.GetCount() == 0);

    dst.Swap(buffer);

    const int widen_s = xofs - widen_by;
    const int widen_e = xofs + widen_by;

    size_t resta = src.GetCount()/2;
    size_t restb = buffer.GetCount()/2;
    const SpanBuffer::Span *spa = src.GetSpanIteratorBegin();
    const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin();

    while (resta > 0 || restb > 0)
    {
        if (restb == 0)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else if (resta == 0)
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
        else if (spa->start < spb->start)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
    }

    buffer.Clear();
}

尝试使用deque而不是vector。Deque进行的内存分配较少,但代价是它不是连续的内存。 - moswald

0

你在第一种方法中提到的排序可以被降低到线性时间(从你描述的对数线性)。因为这两个输入列表已经排好序了。只需执行归并排序的合并步骤即可。通过适当的输入跨度向量表示(例如单链表),这可以在原地完成。

http://en.wikipedia.org/wiki/Merge_sort


0

我认为严格线性解决方案是不可能的,因为扩大源向量跨度可能会在最坏情况下导致它们全部重叠(取决于您添加的常数的大小)

问题可能出在实现上,而不是算法本身;建议对之前的解决方案进行代码分析,查看时间花在了哪里

原因:

对于像 Intel Core 2 Extreme QX9770 这样真正“现代”的 CPU,运行速度达到 3.2GHz,预计可以达到约 59,455 MIPS。

对于 100,000 个向量,您需要在594,550个指令中处理每个向量。这是非常多的指令。

参考:wikipedia MIPS

此外,请注意,将一个常数添加到源向量范围并不会对其进行重新排序,因此您可以独立地规范化源向量范围,然后将其与目标向量范围合并;这应该能够减轻原始算法的工作负载。


10万可能实际上不足以支撑,我没有真正计算过数字。另外,当我说“现代”CPU时,我实际上是想到“最多5年前的”Athlon XP 3000+,这并非是一个不切实际的目标。 - jfs

0

1已经被淘汰了——完整的排序比合并两个已排序列表要慢。

所以你需要调整2(或者使用全新的方法)。

如果你将数据结构改为双向链表,那么你就可以在恒定的工作空间内合并它们。

对于列表节点,使用固定大小的堆分配器,既可以减少每个节点的内存使用量,也可以提高节点在内存中靠近的机会,从而减少页面缺失。

你可能能够在网上或你喜欢的算法书中找到优化链表合并的代码。你需要自定义它,以便同时进行跨度合并和列表合并。

为了优化合并,首先注意到对于每个值的运行,如果没有来自另一侧的值,则可以一次性将整个运行插入到目标列表中,而不是逐个插入每个节点。通过让末尾“悬空”,知道稍后会修补它,可以节省一个写操作。只要你的应用程序中没有在其他地方进行删除,列表就可以是单向链接的,这意味着每个节点只需要一次写操作。

至于10微秒的运行时间——这有点取决于n和m的大小……


0

如果你最近的实现仍然不够快,你可能需要考虑其他方法。

你用这个函数的输出做什么?


你缺少了一件事,那就是不只有一个“delta”,而是两个。“span”的左右两侧具有不同的值,特别是右侧比左侧添加了更大的值。 - jfs

0

我会一直保持我的跨度向量排序。这使得实现算法变得更加容易--并且可以在线性时间内完成。

好的,所以我会根据以下方式对跨度进行排序:

  • 按递增顺序排列跨度最小值
  • 然后按递减顺序排列跨度最大值

你需要创建一个函数来执行此操作。

然后我会使用std::set_union合并向量(您可以在继续之前合并多个向量)。

然后对于每个具有相同最小值的连续跨度集,保留第一个并删除其余部分(它们是第一个跨度的子跨度)。

然后您需要合并您的跨度。现在应该很容易做到,并且可以在线性时间内完成。

好的,现在的技巧是不要尝试原地执行此操作。使用一个或多个临时向量(并预先保留足够的空间)。然后在结束时,调用std::vector::swap将结果放入您选择的输入向量中。

希望这足以让您开始了解。


0

你的目标系统是什么?它支持多核吗?如果是,你可以考虑使用多线程来实现这个算法。


我的目标系统是最近五年内的桌面系统,我不能假设关于 SMP 支持或 SIMD 指令集的任何内容。(嗯,我可以假设在 x86 上有 MMX,但就这些。) - jfs

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接