C++:线程池比单线程慢?

7

首先,我查看了这个网站上的其他主题,并发现它们与我的问题无关,因为它们大多涉及使用I/O操作或线程创建开销。我的问题是,我的线程池或工作任务结构实现在这种情况下比单线程要慢得多。我对此感到非常困惑,不确定是线程池、任务本身、如何测试它、线程的性质还是某些我无法控制的东西。

// Sorry for the long code
#include <vector>
#include <queue>

#include <thread>
#include <mutex>
#include <future>

#include "task.hpp"

class ThreadPool
{
public:
    ThreadPool()
    {
        for (unsigned i = 0; i < std::thread::hardware_concurrency() - 1; i++)
            m_workers.emplace_back(this, i);

        m_running = true;
        for (auto&& worker : m_workers)
            worker.start();
    }
    ~ThreadPool()
    {
        m_running = false;
        m_task_signal.notify_all();
        for (auto&& worker : m_workers)
            worker.terminate();
    }

    void add_task(Task* task)
    {
        {
            std::unique_lock<std::mutex> lock(m_in_mutex);
            m_in.push(task);
        }
        m_task_signal.notify_one();
    }
private:
    class Worker
    {
    public:
        Worker(ThreadPool* parent, unsigned id) : m_parent(parent), m_id(id)
        {}
        ~Worker()
        {
            terminate();
        }

        void start()
        {
            m_thread = new std::thread(&Worker::work, this);
        }
        void terminate()
        {
            if (m_thread)
            {
                if (m_thread->joinable())
                {
                    m_thread->join();
                    delete m_thread;
                    m_thread = nullptr;
                    m_parent = nullptr;
                }
            }
        }
    private:
        void work()
        {
            while (m_parent->m_running)
            {               
                std::unique_lock<std::mutex> lock(m_parent->m_in_mutex);
                m_parent->m_task_signal.wait(lock, [&]()
                {
                    return !m_parent->m_in.empty() || !m_parent->m_running;
                });

                if (!m_parent->m_running) break;
                Task* task = m_parent->m_in.front();
                m_parent->m_in.pop();
                // Fixed the mutex being locked while the task is executed
                lock.unlock();

                task->execute();            
            }
        }
    private:
        ThreadPool* m_parent = nullptr;
        unsigned m_id = 0;

        std::thread* m_thread = nullptr;
    };
private:
    std::vector<Worker> m_workers;

    std::mutex m_in_mutex;
    std::condition_variable m_task_signal;
    std::queue<Task*> m_in;

    bool m_running = false;
};

class TestTask : public Task
{
public:
    TestTask() {}
    TestTask(unsigned number) : m_number(number) {}

    inline void Set(unsigned number) { m_number = number; }

    void execute() override
    {
        if (m_number <= 3)
        {
            m_is_prime = m_number > 1;
            return;
        }
        else if (m_number % 2 == 0 || m_number % 3 == 0)
        {
            m_is_prime = false;
            return;
        }
        else
        {
            for (unsigned i = 5; i * i <= m_number; i += 6)
            {
                if (m_number % i == 0 || m_number % (i + 2) == 0)
                {
                    m_is_prime = false;
                    return;
                }
            }
            m_is_prime = true;
            return;
        }
    }
public:
    unsigned m_number = 0;
    bool m_is_prime = false;
};

int main()
{
    ThreadPool pool;

    unsigned num_tasks = 1000000;
    std::vector<TestTask> tasks(num_tasks);
    for (auto&& task : tasks)
        task.Set(randint(0, 1000000000));

    auto s = std::chrono::high_resolution_clock::now();
    #if MT
    for (auto&& task : tasks)
        pool.add_task(&task);
    #else
    for (auto&& task : tasks)
        task.execute();
    #endif
    auto e = std::chrono::high_resolution_clock::now();
    double seconds = std::chrono::duration_cast<std::chrono::nanoseconds>(e - s).count() / 1000000000.0;
}

使用VS2013 Profiler 进行基准测试:

10,000,000 tasks:
    MT:
        13 seconds of wall clock time
        93.36% is spent in msvcp120.dll
        3.45% is spent in Task::execute() // Not good here
    ST:
        0.5 seconds of wall clock time
        97.31% is spent with Task::execute()

3
请先展示您“耗时”的代码,说明您如何进行测量以及编译它。这可能很重要。 - deviantfan
@deviantfan 我发现那个错误太晚了。修订后的答案。 - James Nguyen
@RobStarling 英特尔(R) Core(TM) i5-3230M CPU @ 2.60GHz 内核数:2 逻辑处理器数量:4(hardware_concurrency()输出为4) - James Nguyen
1
你的程序不会改变环境(stdio、文件系统、网络、渲染等)。过于智能的编译器理论上可以将你的程序优化为 no-op 或接近 no-op(因为它不会改变可观察结果)。你能发布 ST 变体 main 的反汇编吗? - Ivan Aksamentov - Drop
1
在MT版本中,你不应该等待任务完成吗? - Rob Starling
显示剩余2条评论
3个回答

6

通常在这样的回答中需要声明一下:唯一确定的方法是使用分析工具来测量。

但是我会尝试在没有工具的情况下解释你的结果。首先,你在所有线程中只有一个互斥锁。因此,每次只有一个线程可以执行某个任务。这将抵消你可能获得的所有收益。尽管你使用了多线程,但你的代码完全是串行的。所以至少要使任务执行不受互斥锁的影响。你只需要锁定互斥锁来从队列中获取任务,当任务被执行时就不需要持有它了。

其次,你的任务非常简单,单个线程很快就可以执行完它们。你无法通过这些任务来衡量任何收益。创建一些更复杂的任务,这些任务可以产生一些更有趣的结果(一些接近实际情况的任务,而不是人为构造的任务)。

第三点:线程并不是没有代价的——上下文切换、互斥锁竞争等等。如前两点所述,为了获得真正的收益,你需要有比线程引入的开销更长的任务时间,并且代码应该是真正的并行而不是等待某些资源使其变成串行。

更新:我看错了代码的部分。如果你创建足够大的任务,那么任务已经足够复杂了。


更新2:我已经使用你的代码进行了测试,并找到了一个很好的质数来展示多线程代码的优势。使用以下质数:1019048297。它将提供足够的计算复杂性来显示差异。

但是为什么你的代码没有产生好的结果呢?如果没有看到randint()的实现,很难说清楚,但我认为它非常简单,在一半的情况下返回偶数,在其他情况下也不会产生太大的质数。因此,这些任务非常简单,上下文切换和其他与你特定实现以及线程有关的事情所消耗的时间比计算本身还要多。使用我给你的质数,任务别无选择,只能花时间计算——没有简单的答案,因为这个数字又大又实际是质数。这就是为什么大数字会给你寻求的答案——更好的MT代码时间。


你有没有一些重型任务,但并不是特别难以完成的? - James Nguyen
@James,请看更新后的答案。我看错了你代码的部分。 - ixSci
我已经解决了互斥问题,并使用性能分析器更新了基准测试。 - James Nguyen
我明白了...我的原始目标是将另一个项目中的ST任务卸载。我会看一下这个函数是否值得使用。 - James Nguyen

2

在任务执行期间,您不应该持有互斥锁,否则其他线程将无法获取任务:

void work() {
    while (m_parent->m_running) {   
        Task* currentTask = nullptr;    
        std::unique_lock<std::mutex> lock(m_parent->m_in_mutex);
        m_parent->m_task_signal.wait(lock, [&]() {
            return !m_parent->m_in.empty() || !m_parent->m_running;
        });                     
        if (!m_parent->m_running) continue;
        currentTask = m_parent->m_in.front();
        m_parent->m_in.pop();               
        lock.unlock(); //<- Release the lock so that other threads can get tasks
        currentTask->execute();
        currentTask = nullptr;
    }   
}       

我刚刚根据ixSci的答案修复了这个问题。 - James Nguyen
@James 听到这个消息很好,但请确保将帮助您解决问题的答案标记为已接受。 - CJCombrink

1
对于MT来说,“开销”的每个阶段花费了多少时间:std :: unique_lock,m_task_signal.wait,front,pop,unlock?
基于您的结果仅有3%的有用工作,这意味着以上过程消耗了97%。我会为上述每个部分获取数字(例如,在每个调用之间添加时间戳)。
在我看来,您用于[仅]出队下一个任务指针的代码非常繁重。我会使用更简单的队列[可能是无锁]机制。或者,也可以使用原子操作将索引增加到队列中,而不是上面的五个步骤。例如:
void
work()
{
    while (m_parent->m_running) {
        // NOTE: this is just an example, not necessarily the real function
        int curindex = atomic_increment(&global_index);
        if (curindex >= max_index)
            break;

        Task *task = m_parent->m_in[curindex];

        task->execute();
    }
}

另外,也许你应该每次弹出十个而不是一个。

你可能会遇到内存限制和/或“任务切换”限制。例如,对于访问数组的线程,超过四个线程通常会饱和内存总线。你还可能会遇到锁的严重争用,导致线程被饿死,因为一个线程在间接地垄断锁[即使使用新的unlock调用]。

线程间锁定通常涉及“串行化”操作,其他核心必须同步其乱序执行管道。

这里是一种“无锁”实现:

void
work()
{
    // assume m_id is 0,1,2,...
    int curindex = m_id;

    while (m_parent->m_running) {
        if (curindex >= max_index)
            break;

        Task *task = m_parent->m_in[curindex];

        task->execute();

        curindex += NUMBER_OF_WORKERS;
    }
}

我也有同样的想法,可以推送4个(或任何倍数)任务并通知所有4个线程,并获取一组任务。我有点避免原子操作,因为我还没有完全学会它们,但乍一看它们似乎比我的设置更好。我会尝试一下。 - James Nguyen
这里有一个链接:https://dev59.com/HlwY5IYBdhLWcg3wCUE7。它包含了我创建的CAS实现。但更重要的是,它内部有一个链接指向一个在cppcon上介绍无锁编程的视频讲座。 - Craig Estey

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接