在C++中分配大内存块

8
我将尝试在C++中为一个由浮点数值组成的3D矩阵分配一块大内存。其维度为44100x2200x2。这应该需要精确地分配44100x2200x2x4字节的内存,约为7.7GB。我使用Ubuntu上的64位x86机器上的g++编译我的代码。当我使用htop查看进程时,我看到内存使用量增长到32GB并迅速被杀死。我在内存计算中犯了错误吗?
以下是我的代码:
#include <iostream>

using namespace std;
int main(int argc, char* argv[]) {
  int N = 22000;
  int M = 44100;
  float*** a = new float**[N];
  for (int m = 0; m<N; m+=1) {
    cout<<((float)m/(float)N)<<endl;
    a[m] = new float*[M - 1];
    for (int n = 0; n<M - 1; n+=1) {
      a[m][n] = new float[2];
    }
  }
}

编辑:我的计算有误,我分配了接近38GB的空间。我已经修复了代码,现在分配15GB。

#include <iostream>

using namespace std;
int main(int argc, char* argv[]) {
  unsigned long  N = 22000;
  unsigned long  M = 44100;
  unsigned long blk_dim = N*(M-1)*2;
  float* blk = new float[blk_dim];
  unsigned long b = (unsigned long) blk;

  float*** a = new float**[N];
  for (int m = 0; m<N; m+=1) {
    unsigned long offset1 = m*(M - 1)*2*sizeof(float);
    a[m] = new float*[M - 1];
    for (int n = 0; n<M - 1; n+=1) {
      unsigned long offset2 = n*2*sizeof(float);
      a[m][n] = (float*)(offset1 + offset2 + b);
    }
  }
}

2
这意味着需要7.7GB的连续内存。对于分配器来说,这是一个相当困难的要求,许多系统都无法达到。你能把它分成许多较小的块吗? - Alejandro
3
@Alejandro,它(指实现)是如何保持连续存储的?每个“new”不是都是单独的/非连续的(可能效率较低),这样做不是更难吗?或者“相当困难的任务”是指它不是这种情况? - user2864740
2
@Alejandro 虚拟 连续,但不是物理上连续的。这需要大量的内存,但在Linux上,它将直接使用 mmap(2) - Jonathon Reinhart
1
@SpentDeath 1) 多次说错并不意味着它是正确的[8 不能 正确,因为上面已经说明了原因]; 2) 代码中的N是22,000而不是2,200; (3. 如果注意到括号..) - user2864740
1
你能解释一下为什么你说“我正在尝试分配一个大内存块”,然后却发布了实际上分配了数百万个小内存块的代码吗? - M.M
显示剩余10条评论
3个回答

13

您忘记了一维,还有分配内存的开销。所示代码在第三个维度上分配内存非常低效,导致开销过大。

float*** a = new float**[N];

这将分配大约22000 * sizeof(float **)的内存,大约为176KB。可以忽略不计。

a[m] = new float*[M - 1];
一个单一的分配将是为了 44099 * sizeof(float *),但你会获取其中的22000个。 22000 * 44099 * sizeof(float *),或者大约7.7GB的额外内存。这是你停止计数的地方,但你的代码还没有完成。它还有很长的路要走。
a[m][n] = new float[2];

这是8字节的单次分配,但此分配将执行22000 * 44099次。 这又是7.7GB被浪费了。现在您需要超过15 GB的应用程序所需内存,大致需要分配。

但是每个分配并不是免费的new float[2]需要更多的8字节。每个单独分配的块必须在C++库中进行内部跟踪,以便可以通过delete回收。最简单的基于链接列表的堆分配实现需要一个向前指针,一个向后指针和分配块中有多少字节的计数。假设为了对齐目的而不需要填充任何内容,则在64位平台上,每次分配的开销至少为24字节的开销。

现在,由于第三个维度进行了22000 * 44099次分配,第二个维度进行了22000次分配,第一维度进行了一次分配:如果我使用手指计算,这将需要(22000 * 44099 + 22000 + 1)* 24,或者另外22 GB内存,仅用于消耗最简单基本的内存分配方案的开销。

如果我的数学没错,我们现在需要使用最简单、可能的堆分配跟踪约38 GB的RAM。您的C++实现可能会使用更复杂的堆分配逻辑,并具有更大的开销。

摆脱new float[2]。计算矩阵的大小,然后new一个7.7GB的单块,接着计算其余指针应该指向哪里。还要为矩阵的第二个维度分配单个内存块,并计算第一维度的指针。

您的分配代码应该准确地执行三个new语句。一个用于第一维度指针,一个用于第二维度指针。还需要一个巨大的数据块的new语句,它包含您的第三个维度。


2
任何使用如此多的开销来分配8字节块的分配方案都是垃圾。一个合理的分配方案最多只会使用8个字节来进行分配,而一个好的分配方案将只使用一页内专门用于特定大小块的小块内存,因此对于8字节的分配仅有1-2%的开销。 - Chris Dodd
44100和22000都是CLI参数。它们现在作为示例进行硬编码。 - lee
4
在这种情况下,你可以使用float * bigArray = new float[M*N*O];,并编写获取器和设置器方法来计算一维数组中正确的偏移量。重要的是,由于数组是规则的,因此您只需要进行一次new调用。 - Jeremy Friesner
2
@SpentDeath 我的意思是你正在为大量单元格分配空间。如果其中大部分实际上不需要(它们本来会携带一些默认值),并且您不需要将此怪物的部分或全部馈送给某些需要连续内存的API(您发布的代码中没有这种情况),则可以利用其他选项。例如,将int映射到int的2个单元格数组的映射。性能可能不会很好(显然,我们正在通过无序映射进行哈希处理),但内存利用率可能会更好。 - WhozCraig
@WhozCraig 矩阵中的所有条目都是非零的。这是一个预先计算的表格,在许多操作中使用。它需要保持在内存中,并且所有部分都需要可访问。您不能将其子部分保存在内存中,因为可能有计算遍历所有矩阵值并需要访问矩阵中任意位置的向量。 - lee
显示剩余6条评论

4

为了补充已经给出的一个答案,下面的例子基本上是如何创建连续的二维数组中给出答案的扩展,它演示了只使用3个new[]调用的用法。

优点是保留了你通常使用三级指针时会用到的[][][]语法(虽然我强烈建议不要像这样使用“3颗星”的代码编写,但我们所拥有的就是这样)。缺点是除了单一的数据内存池之外,还分配了更多指针内存。

#include <iostream>
#include <exception>

template <typename T>
T*** create3DArray(unsigned pages, unsigned nrows, unsigned ncols, const T& val = T())
{
    T*** ptr = nullptr;  // allocate pointers to pages
    T** ptrMem = nullptr;
    T* pool = nullptr;
    try 
    {
        ptr = new T**[pages];  // allocate pointers to pages
        ptrMem = new T*[pages * nrows]; // allocate pointers to pool
        pool = new T[nrows*ncols*pages]{ val };  // allocate pool

        // Assign page pointers to point to the pages memory,
        // and pool pointers to point to each row the data pool
        for (unsigned i = 0; i < pages; ++i, ptrMem += nrows)
        {
            ptr[i] = ptrMem;
            for (unsigned j = 0; j < nrows; ++j, pool += ncols)
                ptr[i][j] = pool;
        }
        return ptr;
     }
     catch(std::bad_alloc& ex)
     {
         // rollback the previous allocations
        delete [] ptrMem;
        delete [] ptr;
        throw ex; 
    }
}

template <typename T>
void delete3DArray(T*** arr)
{
    delete[] arr[0][0]; // remove pool
    delete[] arr[0];  // remove the pointers
    delete[] arr;     // remove the pages
}

int main()
{
    double ***dPtr = nullptr;
    try 
    {
        dPtr = create3DArray<double>(4100, 5000, 2);
    }
    catch(std::bad_alloc& )
    {
        std::cout << "Could not allocate memory";
        return -1;
    }
    dPtr[0][0][0] = 10;  // for example
    std::cout << dPtr[0][0][0] << "\n";
    delete3DArray(dPtr);  // free the memory
}

实时例子


这是一个基本的RAII类。虽然没有经过彻底测试,但应该可以正常工作。

#include <exception>
#include <algorithm>
#include <iostream>

template <typename T>
class Array3D
{
    T*** data_ptr;
    unsigned m_rows;
    unsigned m_cols;
    unsigned m_pages;

    T*** create3DArray(unsigned pages, unsigned nrows, unsigned ncols, const T& val = T()) 
    {
        T*** ptr = nullptr;  // allocate pointers to pages
        T** ptrMem = nullptr;
        T* pool = nullptr;
        try
        {
            ptr = new T * *[pages];  // allocate pointers to pages
            ptrMem = new T * [pages * nrows]; // allocate pointers to pool
            pool = new T[nrows * ncols * pages]{ val };  // allocate pool

            // Assign page pointers to point to the pages memory,
            // and pool pointers to point to each row the data pool
            for (unsigned i = 0; i < pages; ++i, ptrMem += nrows)
            {
                ptr[i] = ptrMem;
                for (unsigned j = 0; j < nrows; ++j, pool += ncols)
                    ptr[i][j] = pool;
            }
            return ptr;
        }
        catch (std::bad_alloc& ex)
        {
            // rollback the previous allocations
            delete[] ptrMem;
            delete[] ptr;
            throw ex;
        }
    }

    public:
        typedef T value_type;
        T*** data() { return data_ptr;  }

        unsigned get_num_pages() const { return m_pages; }
        unsigned get_num_rows() const { return m_rows; }
        unsigned get_num_cols() const { return m_cols; }

        Array3D() : data_ptr{}, m_rows{}, m_cols{}, m_pages{} {}

        Array3D(unsigned pages, unsigned rows, unsigned cols, const T& val = T())
        {
            if ( pages == 0 )
                throw std::invalid_argument("number of pages is 0");
            if (rows == 0)
                throw std::invalid_argument("number of rows is 0");
            if (cols == 0)
                throw std::invalid_argument("number of columns is 0");
            data_ptr = create3DArray(pages, rows, cols, val);
            m_pages = pages;
            m_rows = rows;
            m_cols = cols;
        }

        ~Array3D()
        {
            if (data_ptr)
            {
                delete[] data_ptr[0][0]; // remove pool
                delete[] data_ptr[0];  // remove the pointers
                delete[] data_ptr;     // remove the pages
            }
        }

        Array3D(const Array3D& rhs) : m_rows(rhs.m_rows), m_cols(rhs.m_cols), m_pages(rhs.m_pages)
        {
            data_ptr = create3DArray(m_pages, m_rows, m_cols);
            std::copy(&rhs.data_ptr[0][0][0], &rhs.data_ptr[m_pages - 1][m_rows - 1][m_cols], &data_ptr[0][0][0]);
        }

        Array3D& operator=(const Array3D& rhs)
        {
            if (&rhs != this)
            {
                Array3D temp(rhs);
                swap(*this, temp);
            }
            return *this;
        }

        Array3D(Array3D&& rhs) noexcept : data_ptr(rhs.data_ptr), m_pages(rhs.m_pages), m_rows(rhs.m_rows), m_cols(rhs.m_cols)
        {
            rhs.data_ptr = nullptr;
        }

        Array3D& operator =(Array3D&& rhs) noexcept
        {
            if (&rhs != this)
            {
                swap(rhs, *this);
            }
            return *this;
        }

        void swap(Array3D& left, Array3D& right)
        {
            std::swap(left.data_ptr, right.data_ptr);
            std::swap(left.m_cols, right.m_cols);
            std::swap(left.m_rows, right.m_rows);
            std::swap(left.m_pages, right.m_pages);
        }

        T** operator[](unsigned page)
        {
            return data_ptr[page];
        }

        const T** operator[](unsigned page) const
        {
            return data_ptr[page];
        }
    };

int main()
{
    try
    {
        Array3D<double> dPtr(10, 10, 10);
        dPtr[0][0][0] = 20;
        dPtr[0][0][3] = -23;
        std::cout << dPtr[0][0][0] << " " << dPtr[0][0][3] << "\n";
        Array3D<double> test = dPtr;
        std::cout << test[0][0][0] << " " << test[0][0][3] << "\n";
        Array3D<double> test2;
        test2 = test;
        std::cout << test2[0][0][0] << " " << test2[0][0][3] << "\n";
    }
    catch (std::exception& ex)
    {
        std::cout << ex.what();
    }
}

示例实况


我喜欢这个的通用性。比我编辑中的笨拙版本好多了。有没有办法重载原始类型的构造函数和析构函数?我有一个更好的东西想要。 - lee
2
你可以将这些内容封装在一个类中,正如链接答案所述,以利用 RAII(即当对象超出范围时自动解除分配数组内存)。请注意,您还需要为该类添加拷贝构造函数和赋值运算符以及析构函数。我有点懒,没有创建一个完整的类,只想展示如何开始组合这样的东西的内部工作原理。 - PaulMcKenzie
我在考虑将其推广到任何类型T的n维。我意识到我的想法行不通。有没有办法将其推广到N维?因为在不久的将来,我需要处理由2D向量组成的N维矩阵。 - lee
2
@SpentDeath:假设N是一个编译时常量,那就不太难,但需要一些模板递归。在顶层,你创建池T [dim1*...* dimN],在第一级别,你创建T* [dim1*...*dimN-1],直到在最后一级别只创建T**** [dim1] - MSalters
@PaulMcKenzie 我提交了一个简单(嗯,相当简单)的类实现。 - Davislor

2
那可能是您问题的简化版本,但您使用的数据结构(“三星级”数组)几乎从不是您想要的。如果您创建的是类似于这里的密集矩阵,并为每个元素分配空间,则制作数百万个微小分配毫无优势。如果要使用稀疏矩阵,则通常需要像压缩稀疏行这样的格式。
如果数组是“矩形”的(或者我认为3D数组会是“盒状”的),并且所有行和列的大小都相同,则与分配单个内存块相比,此数据结构纯粹是浪费的。您执行了数百万个微小的分配,并为数百万个指针分配了空间,失去了内存局部性。
这个样板为动态3D数组创建了一个零成本抽象。 (好吧,几乎:存储基础一维std :: vector的长度和各个维度是多余的。)API使用a(i,j,k)作为等效于a [i] [j] [k]a.at(i,j,k) 作为带有边界检查的变体。
此API还提供一种选项,可以使用索引函数f(i,j,k)填充数组。如果调用a.generate(f),它会设置每个a(i,j,k)= f(i,j,k)。理论上,这将减少内部循环中的偏移量计算,使其更快。API还可以将生成函数作为array3d<float>(M,N,P,f)传递给构造函数。请根据需要进行扩展。
#include <cassert>
#include <cstddef>
#include <cstdlib>
#include <functional>
#include <iomanip>
#include <iostream>
#include <vector>

using std::cout;
using std::endl;
using std::ptrdiff_t;
using std::size_t;

/* In a real-world implementation, this class would be split into a
 * header file and a definitions file.
 */
template <typename T>
  class array3d {
    public:
    using value_type = T;
    using size_type = size_t;
    using difference_type = ptrdiff_t;
    using reference = T&;
    using const_reference = const T&;
    using pointer = T*;
    using const_pointer = const T*;
    using iterator = typename std::vector<T>::iterator;
    using const_iterator = typename std::vector<T>::const_iterator;
    using reverse_iterator = typename std::vector<T>::reverse_iterator;
    using const_reverse_iterator = typename
      std::vector<T>::const_reverse_iterator;

/* For this trivial example, I don’t define a default constructor or an API
 * to resize a 3D array.
 */
    array3d( const ptrdiff_t rows,
             const ptrdiff_t cols,
             const ptrdiff_t layers )
    {
      const ptrdiff_t nelements = rows*cols*layers;

      assert(rows > 0);
      assert(cols > 0);
      assert(layers > 0);
      assert(nelements > 0);

      nrows = rows;
      ncols = cols;
      nlayers = layers;
      storage.resize(static_cast<size_t>(nelements));
    }

/* Variant that initializes an array with bounds and then fills each element
 * (i,j,k) with a provided function f(i,j,k).
 */
    array3d( const ptrdiff_t rows,
             const ptrdiff_t cols,
             const ptrdiff_t layers,
             const std::function<T(ptrdiff_t, ptrdiff_t, ptrdiff_t)> f )
    {
      const ptrdiff_t nelements = rows*cols*layers;

      assert(rows > 0);
      assert(cols > 0);
      assert(layers > 0);
      assert(nelements > 0);

      nrows = rows;
      ncols = cols;
      nlayers = layers;
      storage.reserve(static_cast<size_t>(nelements));

      for ( ptrdiff_t i = 0; i < nrows; ++i )
        for ( ptrdiff_t j = 0; j < ncols; ++j )
          for ( ptrdiff_t k = 0; k < nlayers; ++k )
            storage.emplace_back(f(i,j,k));

      assert( storage.size() == static_cast<size_t>(nelements) );
    }

    // Rule of 5:
    array3d( const array3d& ) = default;
    array3d& operator= ( const array3d& ) = default;
    array3d( array3d&& ) = default;
    array3d& operator= (array3d&&) = default;

    /* a(i,j,k) is the equivalent of a[i][j][k], except that the indices are
     * signed rather than unsigned.  WARNING: It does not check bounds!
     */
    T& operator() ( const ptrdiff_t i,
                    const ptrdiff_t j,
                    const ptrdiff_t k ) noexcept
    {
      return storage[make_index(i,j,k)];
    }

    const T& operator() ( const ptrdiff_t i,
                          const ptrdiff_t j,
                          const ptrdiff_t k ) const noexcept
    {
      return const_cast<array3d&>(*this)(i,j,k);
    }

    /* a.at(i,j,k) checks bounds.  Error-checking is by assertion, rather than
     * by exception, and the indices are signed.
     */
    T& at( const ptrdiff_t i, const ptrdiff_t j, const ptrdiff_t k )
    {
      bounds_check(i,j,k);
      return (*this)(i,j,k);
    }

    const T& at( const ptrdiff_t i,
                 const ptrdiff_t j,
                 const ptrdiff_t k ) const
    {
      return const_cast<array3d&>(*this).at(i,j,k);
    }

/* Given a function or function object f(i,j,k), fills each element of the
 * container with a(i,j,k) = f(i,j,k).
 */
    void generate( const std::function<T(ptrdiff_t,
                                         ptrdiff_t,
                                         ptrdiff_t)> f )
    {
      iterator it = storage.begin();

      for ( ptrdiff_t i = 0; i < nrows; ++i )
        for ( ptrdiff_t j = 0; j < ncols; ++j )
          for ( ptrdiff_t k = 0; k < nlayers; ++k )
            *it++ = f(i,j,k);

      assert(it == storage.end());
    }

/* Could define a larger API, e.g. begin(), end(), rbegin() and rend() from the STL.
 * Whatever you need.
 */

    private:
    ptrdiff_t nrows, ncols, nlayers;
    std::vector<T> storage;

    constexpr size_t make_index( const ptrdiff_t i,
                                 const ptrdiff_t j,
                                 const ptrdiff_t k ) const noexcept
    {
      return static_cast<size_t>((i*ncols + j)*nlayers + k);
    }

    // This could instead throw std::out_of_range, like STL containers.
    constexpr void bounds_check( const ptrdiff_t i,
                                 const ptrdiff_t j,
                                 const ptrdiff_t k ) const
    {
      assert( i >=0 && i < nrows );
      assert( j >= 0 && j < ncols );
      assert( k >= 0 && k < nlayers );
    }
};

// In a real-world scenario, this test driver would be in another source file:

constexpr float f( const ptrdiff_t i, const ptrdiff_t j, const ptrdiff_t k )
{
  return static_cast<float>( k==0 ? 1.0 : -1.0 *
                             ((double)i + (double)j*1E-4));
}

int main(void)
{
  constexpr ptrdiff_t N = 2200, M = 4410, P = 2;
  const array3d<float> a(N, M, P, f);

  // Should be: -1234.4321
  cout << std::setprecision(8) << a.at(1234,4321,1) << endl;

  return EXIT_SUCCESS;
}

值得注意的是,这段代码在技术上包含未定义行为:它假设有符号整数乘法溢出会产生负数,但实际上,如果程序在运行时请求了一些荒谬的内存量,编译器有权生成完全错误的代码。
当然,如果数组边界是常量,只需声明它们为constexpr并使用具有固定边界的数组即可。
不幸的是,每个新的C++程序员都首先学习char** argv,因为这让人们认为“二维”数组是指指向行的指针的“不规则”数组。
在现实世界中,这几乎从来不是最适合工作的数据结构。

2
尽管使用这样的类并不被推荐,但我认为仅仅通过尝试理解它,人们就可以学到很多关于C++的知识,因此这是一个非常有趣的POC。 - Sam96

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接