C++ - 高效计算向量矩阵乘积

4
我需要尽可能高效地计算一个向量矩阵的积。具体来说,给定向量s和矩阵A,我需要计算s * A。我有一个名为Vector的类,它包装了一个std::vector,以及一个名为Matrix的类,它也包装了一个std::vector(为了提高效率)。
目前使用的朴素方法是设计如下:
Vector<T> timesMatrix(Matrix<T>& matrix)
{
    Vector<unsigned int> result(matrix.columns());
    // constructor that does a resize on the underlying std::vector

    for(unsigned int i = 0 ; i < vector.size() ; ++i)
    {
        for(unsigned int j = 0 ; j < matrix.columns() ; ++j)
        {
            result[j] += (vector[i] * matrix.getElementAt(i, j));
            // getElementAt accesses the appropriate entry
            // of the underlying std::vector
        }
    }
    return result;
}

它运行良好,耗时近12000微秒。请注意,向量 s 具有499个元素,而A是499 x 15500。
下一步是尝试并行计算:如果我有N个线程,则可以将向量 s 的每个部分和矩阵 A 的“相应”行分配给每个线程。每个线程将计算一个大小为499的向量,最终结果将是它们的逐个条目之和。
首先,在类Matrix中,我添加了一个方法来从Matrix中提取一些行并构建一个较小的矩阵:
Matrix<T> extractSomeRows(unsigned int start, unsigned int end)
{
    unsigned int rowsToExtract = end - start + 1;
    std::vector<T> tmp;
    tmp.reserve(rowsToExtract * numColumns);
    for(unsigned int i = start * numColumns ; i < (end+1) * numColumns ; ++i)
    {
        tmp.push_back(matrix[i]);
    }
    return Matrix<T>(rowsToExtract, numColumns, tmp);
}

然后我定义了一个线程例程

void timesMatrixThreadRoutine
    (Matrix<T>& matrix, unsigned int start, unsigned int end, Vector<T>& newRow)
{
    // newRow is supposed to contain the partial result
    // computed by a thread
    newRow.resize(matrix.columns());
    for(unsigned int i = start ; i < end + 1 ; ++i)
    {
        for(unsigned int j = 0 ; j < matrix.columns() ; ++j)
        {
            newRow[j] += vector[i] * matrix.getElementAt(i - start, j);
        }
    }
}

最后,我修改了上述展示的 timesMatrix 方法的代码:
Vector<T> timesMatrix(Matrix<T>& matrix)
{
    static const unsigned int NUM_THREADS = 4;
    unsigned int matRows = matrix.rows();
    unsigned int matColumns = matrix.columns();
    unsigned int rowsEachThread = vector.size()/NUM_THREADS;

    std::thread threads[NUM_THREADS];
    Vector<T> tmp[NUM_THREADS];

    unsigned int start, end;

    // all but the last thread
    for(unsigned int i = 0 ; i < NUM_THREADS - 1 ; ++i)
    {
        start = i*rowsEachThread;
        end = (i+1)*rowsEachThread - 1;

        threads[i] = std::thread(&Vector<T>::timesMatrixThreadRoutine, this,
            matrix.extractSomeRows(start, end), start, end, std::ref(tmp[i]));
    }

    // last thread
    start = (NUM_THREADS-1)*rowsEachThread;
    end = matRows - 1;
    threads[NUM_THREADS - 1] = std::thread(&Vector<T>::timesMatrixThreadRoutine, this,
        matrix.extractSomeRows(start, end), start, end, std::ref(tmp[NUM_THREADS-1]));

    for(unsigned int i = 0 ; i < NUM_THREADS ; ++i)
    {
        threads[i].join();
    }

    Vector<unsigned int> result(matColumns);
    for(unsigned int i = 0 ; i < NUM_THREADS ; ++i)
    {
        result = result + tmp[i];    // the operator+ is overloaded
    }

    return result;
}

它仍然运行,但现在需要近30000微秒,几乎是之前的三倍。

我做错了什么吗?你认为有更好的方法吗?


编辑 - 使用"轻量级" VirtualMatrix

根据Ilya Ovodov的建议,我定义了一个类VirtualMatrix,它包装一个T* matrixData,并在构造函数中初始化为:

VirtualMatrix(Matrix<T>& m)
{
    numRows = m.rows();
    numColumns = m.columns();
    matrixData = m.pointerToData();
    // pointerToData() returns underlyingVector.data();
}

然后有一种方法可以检索矩阵的特定条目:
inline T getElementAt(unsigned int row, unsigned int column)
{
    return *(matrixData + row*numColumns + column);
}

现在执行时间有所改善(大约8000微秒),但也许还有一些改进的空间。特别是线程例程现在

void timesMatrixThreadRoutine
    (VirtualMatrix<T>& matrix, unsigned int startRow, unsigned int endRow, Vector<T>& newRow)
{
    unsigned int matColumns = matrix.columns();
    newRow.resize(matColumns);
    for(unsigned int i = startRow ; i < endRow + 1 ; ++i)
    {
        for(unsigned int j = 0 ; j < matColumns ; ++j)
        {
            newRow[j] += (vector[i] * matrix.getElementAt(i, j));
        }
    }
}

真正缓慢的部分是嵌套for循环。如果我将其删除,则结果显然是错误的,但“计算”时间不到500微秒。这就是说,现在传递参数几乎不需要时间,而重要的是计算。

根据您的看法,是否有任何方法可以使其更快?

2个回答

2

实际上,在extractSomeRows函数中,你为每个线程制作了矩阵的部分副本。这需要很长时间。 重新设计它,使得“一些行”成为指向原始矩阵中数据的虚拟矩阵。


此外,似乎您将矩阵数据复制了两次(第一次复制到std::vector<T> tmp,然后再复制到矩阵中)。 - Ilya Ovodov
谢谢您的建议:尝试一下会很有趣。现在我的问题是“我该如何实现虚拟矩阵?” - minomic
如果矩阵将其数据存储在向量中,则matrix.getElementAt(i, j)的实现类似于*(vector.data() + rowcount*i + j)。 - Ilya Ovodov
现在我有return matrix[row*numColumns + column],其中matrix是存储所有元素的std::vector的名称。我可以尝试使用data()来查看它是否更快。 - minomic
如果Matrix将其数据存储在向量中,则matrix.getElementAt(i, j)的实现类似于return (vector.data() + column_counti + j)。创建VirtualMatrix类,使用对主矩阵向量数据的引用进行初始化。然后,它的getElementAt(i, j)必须实现为*(vector.data() + column_count*(i + start_row) + j)。 - Ilya Ovodov

1

通过更加明确地表达你要乘以4,例如对于x86-64 SSE2+和可能的ARM'S NEON架构,使用向量化汇编指令。

如果你明确地使操作发生在相邻元素中,C++编译器通常可以将循环展开为向量化代码:

在C / C ++中进行简单快速的矩阵向量乘法

还有使用专门用于矩阵乘法的库的选项。对于较大的矩阵,使用基于快速傅里叶变换、Strassen算法等替代算法的特殊实现可能更有效。事实上,你最好使用像这样的C库,然后将其包装在类似于C ++向量的接口中。


我尝试手动展开四个操作,但执行时间并没有改变。我承认我对FFT技术几乎一无所知,所以我会尝试阅读一些资料,看看我能找到什么。 - minomic

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接