C++ 多线程应用比非多线程应用运行得更慢

4

我正在用C++编写一个质数生成器。我首先做了单线程版本,然后又做了多线程版本。

我发现如果我的程序生成的值小于100'000,单线程版本比多线程版本更快。显然我做错了什么。

下面是我的代码:

#include <iostream>
#include <fstream>
#include <set>
#include <string>
#include <thread>
#include <mutex>
#include <shared_mutex>

using namespace std;

set<unsigned long long> primeContainer;
shared_mutex m;

void checkPrime(const unsigned long long p)
{
    if (p % 3 == 0)
        return;

    bool isPrime = true;
    for (set<unsigned long long>::const_iterator it = primeContainer.cbegin(); it != primeContainer.cend(); ++it)
    {
        if (p % *it == 0)
        {
            isPrime = false;
            break;
        }
        if (*it * *it > p) // check only up to square root
            break;
    }

    if (isPrime)
        primeContainer.insert(p);
}

void checkPrimeLock(const unsigned long long p)
{
    if (p % 3 == 0)
        return;

    bool isPrime = true;
    try
    {
        shared_lock<shared_mutex> l(m);
        for (set<unsigned long long>::const_iterator it = primeContainer.cbegin(); it != primeContainer.cend(); ++it)
        {
            if (p % *it == 0)
            {
                isPrime = false;
                break;
            }
            if (*it * *it > p)
                break;
        }
    }
    catch (exception& e)
    {
        cout << e.what() << endl;
        system("pause");
    }

    if (isPrime)
    {
        try
        {
            unique_lock<shared_mutex> l(m);
            primeContainer.insert(p);
        }
        catch (exception& e)
        {
            cout << e.what() << endl;
            system("pause");
        }
    }
}

void runLoopThread(const unsigned long long& l)
{
    for (unsigned long long i = 10; i < l; i += 10)
    {
        thread t1(checkPrimeLock, i + 1);
        thread t2(checkPrimeLock, i + 3);
        thread t3(checkPrimeLock, i + 7);
        thread t4(checkPrimeLock, i + 9);
        t1.join();
        t2.join();
        t3.join();
        t4.join();
    }
}

void runLoop(const unsigned long long& l)
{
    for (unsigned long long i = 10; i < l; i += 10)
    {
        checkPrime(i + 1);
        checkPrime(i + 3);
        checkPrime(i + 7);
        checkPrime(i + 9);
    }
}

void printPrimes(const unsigned long long& l)
{
    if (1U <= l)
        cout << "1 ";
    if (2U <= l)
        cout << "2 ";
    if (3U <= l)
        cout << "3 ";
    if (5U <= l)
        cout << "5 ";

    for (auto it = primeContainer.cbegin(); it != primeContainer.cend(); ++it)
    {
        if (*it <= l)
            cout << *it << " ";
    }
    cout << endl;
}

void writeToFile(const unsigned long long& l)
{
    string name = "primes_" + to_string(l) + ".txt";
    ofstream f(name);

    if (f.is_open())
    {
        if (1U <= l)
            f << "1 ";
        if (2U <= l)
            f << "2 ";
        if (3U <= l)
            f << "3 ";
        if (5U <= l)
            f << "5 ";

        for (auto it = primeContainer.cbegin(); it != primeContainer.cend(); ++it)
        {
            if (*it <= l)
                f << *it << " ";
        }
    }
    else
    {
        cout << "Error opening file." << endl;
        system("pause");
    }
}

int main()
{
    unsigned int n = thread::hardware_concurrency();
    std::cout << n << " concurrent threads are supported." << endl;

    unsigned long long limit;
    cout << "Please enter the limit of prime generation: ";
    cin >> limit;

    primeContainer.insert(7);

    if (10 < limit)
    {
        //runLoop(limit); //single-threaded
        runLoopThread(limit); //multi-threaded
    }

    printPrimes(limit);
    //writeToFile(limit);
    system("pause");
    return 0;
}

main函数中,您会发现有关哪些函数是单线程和多线程的注释。
这两者之间的主要区别在于使用锁,容器迭代使用共享锁,插入使用唯一锁。如果有影响的话,我的CPU有4个核心。
为什么单线程版本更快?

10
多线程并不保证更快,通常会更慢,特别是在存在锁的情况下,如果不小心处理,锁会将代码转换回同步行为。 - johnbakers
3
我认为这是由于创建线程的额外开销。启动每个线程和关闭它们的时间远远大于通过并行计算所节省的时间。你可以不创建一个线程来检查单个数字,而是让同一线程检查所有以1、3为结尾的数字等等。然而,在这种情况下,你需要调整素性测试以处理以不同速率运行的线程。 - Joe F
4个回答

10
您有几个问题。
首先,您不必要地创建和销毁线程。让每个线程循环执行工作,直到没有更多工作可做。
其次,您的锁定粒度过细,结果您过于频繁地获取它们。让每个线程获取一块100个数字进行测试,而不是一个接一个地获取,然后一次性将每块中找到的素数插入其中。

1
+1 通过使您的线程检查不相交的数字集并每个写入其自己的容器,您可以完全避免锁定。当线程完成时,只需连接所有容器即可。 - user2693780
2
这并不像“线程可以让事情变得更快”那么简单。很多东西可以通过线程加速,但要做到足够好以获得显著的性能提升并不简单。具有讽刺意味的是,在像这样的“玩具”代码中,它比在真正的代码中更难实现。 - David Schwartz
感谢您的建议,我进行了一些改进:现在每个线程分别处理以1、3、7和9结尾的值。这解决了您的第一个问题。对于您的第二个问题,我仍在努力思考。我喜欢@denniskb的建议,因为我现在只能到达容器的前半部分,无法到达末尾。我可以为每个线程保留一个本地集合,将新发现的质数插入这些本地集合中,当某个线程到达主集合的末尾时,通知线程将其本地集合倒入主集合中。不过,这听起来需要更多的锁使用。不确定 :/ - IDDQD
@S.T.A.L.K.E.R. 实际上,我的方法根本不需要任何锁定。我在下面以答案的形式发布了一个示例。 - user2693780

4
在我看来,您似乎对每个单独的质数检查都开了一个新线程。我认为这样不好,因为每个质数的计算加上线程启动/关闭以及同步的时间会增加。启动线程可能相当慢。
我建议在主循环之外启动这4个线程,并在每个线程中处理1/4的范围。但这可能需要额外的同步,因为要检查质数,上面的代码显然需要首先有小于sqrt N的质数可用。
从我的角度来看,使用Sieve of Erastothenes算法可能更容易并行化,而且不需要锁定(但仍可能遇到称为“false sharing”的问题)。 编辑 这里我快速创建了一个使用埃拉托色尼筛选法的版本:
void processSieve(const unsigned long long& l,
    const unsigned long long& start,
    const unsigned long long& end,
    const unsigned long long& step,
    vector<char> &is_prime)
{
    for (unsigned long long i = start; i <= end; i += step)
        if (is_prime[i])
            for (unsigned long long j = i + i; j <= l; j += i)
                is_prime[j] = 0;
}

void runSieve(const unsigned long long& l)
{
    vector<char> is_prime(l + 1, 1);
    unsigned long long end = sqrt(l);
    processSieve(l, 2, end, 1, is_prime);
    primeContainer.clear();
    for (unsigned long long i = 1; i <= l; ++i)
        if (is_prime[i])
            primeContainer.insert(i);
}

void runSieveThreads(const unsigned long long& l)
{
    vector<char> is_prime(l + 1, 1);
    unsigned long long end = sqrt(l);
    vector<thread> threads;
    threads.reserve(cpuCount);
    for (unsigned long long i = 0; i < cpuCount; ++i)
        threads.emplace_back(processSieve, l, 2 + i, end, cpuCount, ref(is_prime));
    for (unsigned long long i = 0; i < cpuCount; ++i)
        threads[i].join();
    primeContainer.clear();
    for (unsigned long long i = 1; i <= l; ++i)
        if (is_prime[i])
            primeContainer.insert(i);
}

测量结果,1 000 000 以内的质数(MSVC 2013,发布版):

runLoop: 204.02 ms
runLoopThread: 43947.4 ms
runSieve: 30.003 ms
runSieveThreads (8 cores): 24.0024 ms

高达10,0000,000:

runLoop: 4387.44 ms
// runLoopThread disabled, taking too long
runSieve: 350.035 ms
runSieveThreads (8 cores): 285.029 ms

这段文字的翻译如下:

时间包括向量的最终处理和将结果推送到主要集合中。

正如您所看到的,筛法版本比您的版本快得多,即使在单线程版本中也是如此(对于您的互斥版本,我不得不将锁更改为常规互斥锁,因为MSVC 2013没有共享锁,因此结果可能比您的差得多)。

但是,您可以看到筛法的多线程版本仍然无法像预期的那样运行得快(8个核心,即8个线程,线性加速比单个线程快8倍),尽管没有锁定(权衡一些数字可能会不必要地运行,如果它们尚未被其他线程标记为“无素数”,但总体上结果应该是稳定的,因为每次只设置为0,不管是否同时由多个线程设置)。速度提升不是线性的原因很可能是因为之前提到的“false sharing”问题——写入零的线程使彼此的缓存行无效。


2

因为评论区已经有点拥挤了,而且 OP 表示对不加锁的解决方案感兴趣,下面提供一个这样的方法示例(半伪代码):

vector<uint64_t> primes_thread1;
vector<uint64_t> primes_thread2;
...

// check all numbers in [start, end)
void check_primes(uint64_t start, uint64_t end, vector<uint64_t> & out) {
    for (auto i = start; i < end; ++i) {
        if (is_prime(i)) { // simply loop through all odds from 3 to sqrt(i)
            out.push_back(i);
        }
    }
}

auto f1 = async(check_primes, 1, 1000'000, ref(primes_thread1));
auto f2 = async(check_primes, 1000'000, 2000'000, ref(primes_thread2));
...

f1.wait();
f2.wait();
...

primes_thread1.insert(
    primes_thread1.begin(),
    primes_thread2.cbegin(), primes_thread2.cend()
);
primes_thread1.insert(
    primes_thread1.begin(),
    primes_thread3.cbegin(), primes_thread3.cend()
);
...
// primes_thread1 contains all primes found in all threads

显然,可以通过参数化线程数量和每个范围的大小来很好地重构此代码。我之所以冗长,是为了(希望)更清楚地说明通过一开始不共享任何状态来避免锁定的概念。

1

你的素数测试中可能存在另一个问题。你从未以7作为除数进行测试。

此外,你的测试假定primeContainer已经包含了在10和被测试数字的平方根之间的所有质数。如果你使用线程来填充容器,这可能并不是真实情况。

如果你使用始终递增的数字来填充容器(而你的算法依赖于此),你可以使用std::vector以获得更好的性能,而不是std::set。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接