管理C++ STL POD类型向量回收的简便方法

3

我的应用程序涉及对数十个函数进行数百万次调用。在这些函数中,会初始化并使用一个或多个临时的std::vector容器,此类容器包含POD(普通数据)类型,然后这些容器会被销毁。通过对代码进行分析,我发现分配和释放内存会导致巨大的开销。

一种懒惰的解决方案是将所有的函数重写为包含那些临时缓冲区容器的函数对象。但是这会使内存消耗增加,因为函数数量很多,且缓冲区大小并不是微不足道的。

更好的方法是分析代码,收集所有的缓冲区,预判如何最大程度地重复使用它们,并将最小的一组共享缓冲区容器作为参数提供给函数。但这可能太麻烦了。

我希望可以解决这个问题,在未来的开发中,只要需要临时POD缓冲区就能解决,而无需进行太多预谋。我的想法是实现一个容器端口,并将其引用作为每个可能需要临时缓冲区的函数的参数。在这些函数内部,应该能够从端口中获取任何POD类型的容器,而端口也应在函数返回之前自动回收这些容器。

// Port of vectors of POD types.
struct PODvectorPort 
{
  std::size_t Nlent; // Number of dispatched containers.
  std::vector<std::vector<std::size_t> > X; // Container pool.
  PODvectorPort() { Nlent = 0; }
};


// Functor that manages the port.
struct PODvectorPortOffice
{
  std::size_t initialNlent; // Number of already-dispatched containers
  // when the office is set up.
  PODvectorPort *p; // Pointer to the port.
  
  
  PODvectorPortOffice(PODvectorPort &port) 
  { 
    p = &port; 
    initialNlent = p->Nlent;
  }
  
  
  template<typename X, typename Y>
  std::vector<X> & repaint(std::vector<Y> &y) // Repaint the container.
  {
    // return *((std::vector<X>*)(&y)); // UB although works
    std::vector<X> *rst = nullptr;
    std::memcpy(&rst, &y, std::min(
      sizeof(std::vector<X>*), sizeof(std::vector<Y>*)));
    return *rst; // guess it makes no difference. Should still be UB.
  }
  
  
  template<typename T>
  std::vector<T> & lend()
  {
    ++p->Nlent;
    // Ensure sufficient container pool size:
    while (p->X.size() < p->Nlent) p->X.push_back( std::vector<size_t>(0) );
    return repaint<T, std::size_t>( p->X[p->Nlent - 1] );
  }

  
  void recall() { p->Nlent = initialNlent; }
  ~PODvectorPortOffice() { recall(); }
};


struct ArbitraryPODstruct
{
  char a[11]; short b[7]; int c[5]; float d[3]; double e[2];
};


// Example f1():
// f2(), f3(), ..., f50() are similarly defined.
// All functions are called a few million times in certain 
// order in main(). 
// port is defined in main().
void f1(other arguments..., PODvectorPort &port)
{
  
  PODvectorPort portOffice(port);
  
  // Oh, I need a buffer of chars:
  std::vector<char> &tmpchar = portOffice.lend();
  tmpchar.resize(789); // Trivial if container already has sufficient capacity.
  
  // ... do things
  
  // Oh, I need a buffer of shorts:
  std::vector<short> &tmpshort = portOffice.lend();
  tmpshort.resize(456);  // Trivial if container already has sufficient capacity.
  
  // ... do things.
  
  // Oh, I need a buffer of ArbitraryPODstruct:
  std::vector<ArbitraryPODstruct> &tmpArb = portOffice.lend();
  tmpArb.resize(123);  // Trivial if container already has sufficient capacity.
  
  // ... do things.
  
  // Oh, I need a buffer of integers, but also tmpArb is no longer 
  // needed. Why waste it? Cache hot.
  std::vector<int> &tmpint = portOffice.repaint(tmpArb);
  tmpint.resize(300); // Trivial.
  
  // ... do things.
}

尽管代码可以通过gcc-8.3MSVS 2019使用-O2-Ofast进行编译,并通过所有选项的广泛测试,但由于PODvectorPortOffice :: repaint()的hacky性质,“转换”向量类型原地,我希望能够得到批评。
上述代码正确且有效的一组充分条件(但不是必要条件)为:
  1. std::vector<T>存储底层缓冲区的&[0]&[0] + .size()&[0] + .capacity()这3个指针。
  2. std::vector<T>的配置器调用了malloc()
  3. malloc()返回8字节(或sizeof(std::size_t))对齐的地址。
因此,如果这对您来说是不可接受的,那么有没有现代化的正当方式可以解决我的需求?是否有一种方法编写管理器,只能实现我的代码而不违反标准?
谢谢!
// -std=c++17
#include <cstring>
#include <cstddef>
#include <iostream>
#include <vector>
#include <chrono>


// POD: plain old data.
// Idea: design a class that can let you maximally reuse temporary 
// containers during a program.
// Port of vectors of POD types.
template <std::size_t portsize = 42>
class PODvectorPort 
{
  static constexpr std::size_t Xsize = portsize;
  std::size_t signature;
  std::size_t Nlent; // Number of dispatched containers.
  std::vector<std::size_t> X[portsize]; // Container pool.
  PODvectorPort(const PODvectorPort &);
  PODvectorPort & operator=( const PODvectorPort& );
  
  
public:
  std::size_t Ndispatched() { return Nlent; }
  std::size_t showSignature() { return signature; }
  PODvectorPort() // Permuted random number generator.
  { 
    std::size_t state = std::chrono::high_resolution_clock::now().time_since_epoch().count();
    state ^= (uint64_t)(&std::memmove);
    signature = ((state >> 18) ^ state) >> 27;
    std::size_t rot = state >> 59;
    signature = (signature >> rot) | (state << ((-rot) & 31));
    Nlent = 0; 
  }
  
  
  template<typename podvecport>
  friend class PODvectorPortOffice;
};


// Functor that manages the port.
template<typename podvecport>
class PODvectorPortOffice
{
  // Number of already-dispatched containers when the office is set up.
  std::size_t initialNlent; 
  podvecport *p; // Pointer to the port.
  
  
  PODvectorPortOffice( const PODvectorPortOffice& ); // non construction-copyable
  PODvectorPortOffice& operator=( const PODvectorPortOffice& ); // non copyable
  
  
  constexpr void check()
  {
    while (__cplusplus < 201703)
    {
      std::cerr << "PODvectorPortOffice: C++ < 17, Stall." << std::endl;
    }
    
    
    // Check if allocation will be 8-byte (or more) aligned.
    // Intend it not to work on machine < 64-bit.
    constexpr std::size_t aln = alignof(std::max_align_t);
    while (aln < 8)
    {
      std::cerr << "PODvectorPortOffice: Allocation is not at least 8-byte aligned, Stall." <<
        std::endl;
    }
    
    
    while ((aln & (aln - 1)) != 0)
    {
      std::cerr << "PODvectorPortOffice: Alignment is not a power of 2 bytes. Stall." << std::endl;
    }
    
    
    // Random checks to see if sizeof(vector<S>) != sizeof(vector<T>).
    if(true)
    {
      std::size_t vecHeadSize[16] = {0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0};
      vecHeadSize[0] = sizeof(std::vector<char>(0));
      vecHeadSize[1] = sizeof(std::vector<short>(1));
      vecHeadSize[2] = sizeof(std::vector<int>(2));
      vecHeadSize[3] = sizeof(std::vector<long>(3));
      vecHeadSize[4] = sizeof(std::vector<std::size_t>(5));
      vecHeadSize[5] = sizeof(std::vector<float>(7));
      vecHeadSize[6] = sizeof(std::vector<double>(11));
      vecHeadSize[7] = sizeof(std::vector<std::vector<char> >(13));
      vecHeadSize[8] = sizeof(std::vector<std::vector<int> >(17));
      vecHeadSize[9] = sizeof(std::vector<std::vector<double> >(19));
      struct tmpclass1 { char a; short b; };
      struct tmpclass2 { char a; float b; };
      struct tmpclass3 { char a; double b; };
      struct tmpclass4 { int a; char b; };
      struct tmpclass5 { double a; char b; };
      struct tmpclass6 { double a[5]; char b[3]; short c[3]; };
      vecHeadSize[10] = sizeof(std::vector<tmpclass1>(23));
      vecHeadSize[11] = sizeof(std::vector<tmpclass2>(29));
      vecHeadSize[12] = sizeof(std::vector<tmpclass3>(31));
      vecHeadSize[13] = sizeof(std::vector<tmpclass4>(37));
      vecHeadSize[14] = sizeof(std::vector<tmpclass4>(41));
      vecHeadSize[15] = sizeof(std::vector<tmpclass4>(43));
      
      
      std::size_t notSame = 0;
      for(int i = 0; i < 16; ++i) 
        notSame += vecHeadSize[i] != sizeof(std::size_t) * 3;
      while (notSame)
      {
        std::cerr << "sizeof(std::vector<S>) != sizeof(std::vector<T>), \
PODvectorPortOffice cannot handle. Stall." << std::endl;
      }
    }
  }
  
  
  void recall() { p->Nlent = initialNlent; }
  
  
public:  
  PODvectorPortOffice(podvecport &port) 
  { 
    check();
    p = &port; 
    initialNlent = p->Nlent;
  }
  
  
  template<typename X, typename Y>
  std::vector<X> & repaint(std::vector<Y> &y) // Repaint the container.
    // AFTER A VECTOR IS REPAINTED, DO NOT USE THE OLD VECTOR AGAIN !!
  {
    while (std::is_same<bool, X>::value)
    {
      std::cerr << "PODvectorPortOffice: Cannot repaint the vector to \
std::vector<bool>. Stall." << std::endl;
    }
    std::vector<X> *x;
    std::vector<Y> *yp = &y;
    std::memcpy(&x, &yp, sizeof(x));
    return *x; // Not compliant with strict aliasing rule.
  }
  
  
  template<typename T>
  std::vector<T> & lend()
  {
    while (p->Nlent >= p->Xsize)
    {
      std::cerr << "PODvectorPortOffice: No more containers. Stall." << std::endl;
    }
    ++p->Nlent;
    return repaint<T, std::size_t>( p->X[p->Nlent - 1] );
  }
  
  
  ~PODvectorPortOffice() 
  { 
    // Because p->signature can only be known at runtime, an aggressive,
    // compliant compiler (ACC) will never remove this 
    // branch. Volatile might do, but trustworthiness?
    if(p->signature == 0)
    {
      constexpr std::size_t sizeofvec = sizeof(std::vector<std::size_t>);
      char dummy[sizeofvec * p->Xsize];    
      std::memcpy(dummy, p->X, p->Nlent * sizeofvec);
      std::size_t ticketNum = 0;
      char *xp = (char*)(p->X);
      for(int i = 0, iend = p->Nlent * sizeofvec; i < iend; ++i)
      {
        xp[i] &= xp[iend - i - 1] * 5;
        ticketNum += xp[i] ^ ticketNum;
      }
      std::cerr << "Congratulations! After the port office was decommissioned, \
you found a winning lottery ticket. The odds is less than 2.33e-10. Your \
ticket number is " << ticketNum << std::endl;
      std::memcpy(p->X, dummy, p->Nlent * sizeofvec);
      // According to the strict aliasing rule, a char* can point to any memory
  // block pointed by another pointer of any type T*. Thus given an ACC, 
  // the writes to that block via the char* must be fully acknowledged in 
  // time by T*, namely, for reading contents from T*, a reload instruction 
  // will be kept in the assembly code to achieve a sort of 
  // "register-cache-memory coherence" (RCMC).
  // We also do not care about the renters' (who received the reference via
  // .lend()) RCMC, because PODvectorPortOffice never accesses the contents
  // of those containers.
    }
    recall(); 
  }
};

欢迎提交任何意图破坏它的对抗测试案例,特别是在GCC>=8.3或者MSVS>=2019上进行的。


4
由于这种hacky的本质,我预计会受到批评。你的直觉完全正确。那个转换将严重违反 the strict aliasing rule。一旦编译器开始优化你的代码,一切都会迅速崩溃。这是未定义行为。如果你想进行比特级操作,你应该写汇编代码,而不是C ++。 - Silvio Mayolo
@SilvioMayolo 已经明白了。谢谢。 - user2961927
嗯...我曾经在一个C程序中使用了私有缓冲池来保存许多分配/释放操作。不确定这是否真的适用于此处,但您可以尝试为您的向量使用自定义分配器。 - Serge Ballesta
你能轻松“reserve”的大小有上限吗? - Caleth
@SergeBallesta 很有趣。你是如何处理别名问题的?还是为不同类型分配了不同的缓冲区? - user2961927
显示剩余11条评论
1个回答

2
让我先说一下,我认为这个问题没有一个“权威”的答案。尽管如此,你已经提供了足够的限制条件,可以提出一个有价值的建议方案。让我们回顾一下要求:
  • 解决方案必须使用std::vector。在我看来,这是最不幸的要求,原因在此不表。
  • 解决方案必须符合标准,并且不能违反规则,例如严格别名规则。
  • 解决方案必须要么减少执行的分配数量,要么将分配的开销降至可以忽略的程度。
在我看来,这绝对是自定义分配器的工作。有几个现成的选项接近于实现您想要的功能,例如Boost Pool Allocators。你最感兴趣的是boost::pool_allocator。该分配器将为每个不同的对象大小(注意:不是对象类型)创建一个单例“池”,该池根据需要增长,但在显式清除之前永远不会缩小。
这与你的解决方案的主要区别在于,你将拥有不同大小对象的不同内存池,这意味着它将使用比你发布的解决方案更多的内存,但在我看来这是一个合理的权衡。为了最大化效率,你可以通过创建每种所需类型的具有适当大小的向量来开始一批操作。所有随后使用这些分配器的向量操作都将进行微不足道的O(1)分配和释放。粗略的伪代码如下:
// be careful with this, probably want [[nodiscard]], this is code
// is just rough guidance:
void force_pool_sizes(void)
{
    std::vector<int, boost::pool_allocator<int>> size_int_vect;
    std::vector<SomePodSize16, boost::pool_allocator<SomePodSize16>> size_16_vect;
    ...
    
    size_int_vect.resize(100); // probably makes malloc calls
    size_16_vect.resize(200);  // probably makes malloc calls
    ...

    // on return, objects go out of scope, but singleton pools
    // with allocated blocks of memory remain for future use
    // until explicitly purged.
}

void expensive_long_running(void)
{
    force_pool_sizes();

    std::vector<int, boost::pool_allocator<int>> data1;
    ... do stuff, malloc/free will never be called...

    std::vector<SomePodSize16, boost::pool_allocator<SomePodSize16>> data2;
    ... do stuff, malloc/free will never be called...

    // free everything:
    boost::singleton_pool<boost::pool_allocator_tag, sizeof(int)>::release_memory();
}

如果你想更进一步地提高内存效率,如果你确切知道某些池大小是互不相交的,那么你可以修改boost的pool_allocator,使用稍微不同的单例后备存储器,这样就可以将一个内存块从一个块大小移动到另一个块大小。现在可能超出了范围,但boost代码本身足够简单,如果内存效率很关键,这可能是值得的。
值得指出的是,严格别名规则可能会导致一些混淆,特别是在实现自己的内存分配器时。有许多关于严格别名和它所做和不做的SO问题。这个链接是一个好的起点。
关键是,在低级C++代码中,将一组内存作为某种对象类型进行转换是完全普通和可接受的。如果不是这样,std::allocator就不存在了。你也没有什么用处,比如std::aligned_storage之类的东西。看看std::aligned_storagecppreference上的示例用例。创建了一个类似STL的static_vector类,它保留了一组aligned_storage对象,这些对象被重新转换为具体类型。这里没有什么“不可接受”或“非法”的东西,但是需要额外的知识和处理注意事项。
你的解决方案之所以特别会激怒代码律师,是因为你正在将一个非char对象类型的指针转换为不同的非char对象类型。这是严格别名规则的一个特别令人反感的违反,但也不是必要的,因为有其他选择。
还要记住,别名内存并不是错误,而是警告。我不是说要疯狂地使用别名,但是我想说的是,对于所有C和C++的事情,当你非常深入地了解你的编译器和你运行的机器时,有些规则是可以打破的。只要准备好进行一些非常漫长和痛苦的调试会话,如果事实证明你确实不像你认为的那样了解这两件事。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接