C++11中的线程池

197

相关问题:

关于C++11:

关于Boost:


我该如何获取一个线程池来发送任务而不是一遍又一遍地创建和删除它们?这意味着要有持久的线程来重新同步,而无需加入。


我有下面这段代码:

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

与其在每次迭代中创建和加入线程,我更喜欢将任务发送给工作线程,并仅在第一次迭代时创建它们。


3
这是一个相关的问题和我的答案。 - didierc
1
你考虑过使用tbb吗?(它是英特尔的,但是免费且开源,并且可以完全满足你的需求:你只需要提交(可递归分解)任务,而不必担心线程问题。) - Walter
2
这个 FOSS 项目是我尝试创建一个线程池库的努力,如果你想的话可以去看看。-> code.google.com/p/threadpool11 - Etherealone
使用tbb有什么问题? - Walter
12个回答

199
这是从我的回答到另一个非常相似的帖子进行改编的。
让我们来构建一个ThreadPool类:
class ThreadPool {
public:
    void Start();
    void QueueJob(const std::function<void()>& job);
    void Stop();
    bool busy();

private:
    void ThreadLoop();

    bool should_terminate = false;           // Tells threads to stop looking for jobs
    std::mutex queue_mutex;                  // Prevents data races to the job queue
    std::condition_variable mutex_condition; // Allows threads to wait on new jobs or termination 
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> jobs;
};

ThreadPool::Start
为了实现一个高效的线程池,一旦根据num_threads创建了线程,最好不要创建新的线程或销毁旧的线程(通过join)。这样会有性能损失,甚至可能使您的应用程序比串行版本更慢。因此,我们保持一个线程池,可以随时使用(如果它们还没有运行作业)。
每个线程应该运行自己的无限循环,不断等待新任务并执行。
void ThreadPool::Start() {
    const uint32_t num_threads = std::thread::hardware_concurrency(); // Max # of threads the system supports
    for (uint32_t ii = 0; ii < num_threads; ++ii) {
        threads.emplace_back(std::thread(&ThreadPool::ThreadLoop,this))
    }
}
  • ThreadPool::ThreadLoop
  • 无限循环函数。这是一个while (true)循环,等待任务队列打开。

    void ThreadPool::ThreadLoop() {
        while (true) {
            std::function<void()> job;
            {
                std::unique_lock<std::mutex> lock(queue_mutex);
                mutex_condition.wait(lock, [this] {
                    return !jobs.empty() || should_terminate;
                });
                if (should_terminate) {
                    return;
                }
                job = jobs.front();
                jobs.pop();
            }
            job();
        }
    }
    

    3. ThreadPool::QueueJob
    将一个新的任务添加到线程池中;使用锁来避免数据竞争。
    void ThreadPool::QueueJob(const std::function<void()>& job) {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            jobs.push(job);
        }
        mutex_condition.notify_one();
    }
    

    使用它的方法:
    thread_pool->QueueJob([] { /* ... */ });
    

    4. ThreadPool::busy 线程池的忙碌状态
    bool ThreadPool::busy() {
        bool poolbusy;
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            poolbusy = !jobs.empty();
        }
        return poolbusy;
    }
    

    busy()函数可以在while循环中使用,以便主线程在调用线程池析构函数之前等待线程池完成所有任务。

    1. ThreadPool::Stop

    停止线程池。

    void ThreadPool::Stop() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            should_terminate = true;
        }
        mutex_condition.notify_all();
        for (std::thread& active_thread : threads) {
            active_thread.join();
        }
        threads.clear();
    }
    

    一旦你整合了这些要素,你就拥有了自己的动态线程池。这些线程始终运行着,等待工作的到来。
    如果有语法错误,我很抱歉,因为我自己输入这段代码,我的记忆力不好。很抱歉我不能提供完整的线程池代码,那将违背我的职业操守。
    备注: - 匿名代码块用于在退出时,使其内部创建的std::unique_lock变量超出作用域,从而释放互斥锁。 - ThreadPool::Stop并不会终止当前正在运行的任务,它只是通过active_thread.join()等待它们完成。

    4
    std::vector 不要求其元素可复制。您可以使用带有移动语义类型(unique_ptrthreadfuture 等)的向量。 - Daniel Langr
    6
    当您终止并且没有剩余工作时会发生什么? - user877329
    6
    "Infinite_loop_function" 是一个有趣的函数名称,它从队列中获取任务并执行它们。 - Solomon Slow
    11
    我猜在Start()方法中,threads.at(i) = std::thread(ThreadLoop);应该改为threads.at(i) = std::thread(&ThreadPool::ThreadLoop, this); - Ignacio Martin
    3
    我认为有一个错误:poolbusy = jobs.empty() 应该是 poolbusy = !jobs.empty() - Peter K
    显示剩余13条评论

    114

    您可以使用C++线程池库,https://github.com/vit-vit/ctpl

    然后,您编写的代码可以被替换为以下内容:

    #include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library
    
    int main (int argc, char *argv[]) {
        ctpl::thread_pool p(2 /* two threads in the pool */);
        int arr[4] = {0};
        std::vector<std::future<void>> results(4);
        for (int i = 0; i < 8; ++i) { // for 8 iterations,
            for (int j = 0; j < 4; ++j) {
                results[j] = p.push([&arr, j](int){ arr[j] +=2; });
            }
            for (int j = 0; j < 4; ++j) {
                results[j].get();
            }
            arr[4] = std::min_element(arr, arr + 4);
        }
    }
    
    您将获得所需数量的线程,并且不会在迭代中一遍又一遍地创建和删除它们。

    21
    这应该是答案;单头文件、易读、直截了当、简明扼要并符合C++11标准的库。做得好! - Jonathan H
    1
    @vit-vit,你能举一个函数的例子吗?如何将类成员函数推入results[j] = p.push([&arr, j](int){ arr[j] +=2; });中? - Hani Goc
    1
    @HaniGoc 只需通过引用捕获实例。 - Jonathan H
    @vit-vit 给你发送了一个拉取请求,以改进STL版本。 - Jonathan H
    @HaniGoc 我试了很久用bind来实现这个...谢天谢地有lambda表达式! >.< - Managarm
    显示剩余3条评论

    74

    线程池意味着所有线程一直在运行,也就是说,线程函数从不返回。为了给线程有意义的任务,你需要设计一个线程间通信系统,既可用于告诉线程有任务要做,也可用于传递实际工作数据。

    通常,这将涉及某种并发数据结构,每个线程可能会在某种条件变量上休眠,当有工作要做时会被通知。接收到通知后,其中一个或多个线程将醒来,从并发数据结构中恢复一个任务,进行处理,并以类似的方式存储结果。

    然后,线程将继续检查是否还有更多的工作要做,如果没有,就回去休眠。

    总之,你必须自己设计所有这些内容,因为没有普遍适用的“工作”概念。这是相当大量的工作,并且有一些微妙的问题需要正确处理。(如果你喜欢,可以使用Go编程语言,它会在后台为你管理线程管理。)


    15
    “你需要自己设计所有这些内容” <- 这正是我试图避免做的。不过,Go协程似乎非常棒。 - Yktula
    3
    @Yktula:这是一项高度非平凡的任务。从您的帖子中甚至无法清楚地了解您需要完成哪种工作,这对于解决方案非常基础。您可以在C++中实现Go,但这将是一个非常具体的事情,有一半的人会抱怨他们需要不同的东西。 - Kerrek SB

    21
    一个线程池本质上是一组线程,都绑定到一个作为事件循环的函数上。这些线程将无休止地等待任务被执行,或者它们自己被终止。
    线程池的工作是提供一个接口来提交作业,定义(并可能修改)运行这些作业的策略(调度规则、线程实例化、池大小),以及监视线程和相关资源的状态。
    因此,对于一个多才多艺的池,必须首先定义什么是任务,它如何启动、中断,结果是什么(有关该问题的 promise 和 future 的概念),线程将需要响应什么样的事件,它们将如何处理这些事件,这些事件将如何与任务处理的事件区分开来。正如您所看到的,这可能变得非常复杂,并对线程的工作方式施加限制,因为解决方案变得越来越复杂。
    目前处理事件的工具相当基础(*):原语如互斥量、条件变量,以及在其上的一些抽象(锁、屏障)。但在某些情况下,这些抽象可能不适用(请参见此相关问题),必须返回使用原语。

    还有其他问题也需要管理:

    • 信号
    • 输入/输出
    • 硬件(处理器亲和力,异构设置)

    这些在您的环境中会如何发挥作用?

    这个答案 回答了类似问题并指向了一个针对boost和stl的现有实现。

    我为另一个问题提供了非常简陋的线程池实现,它没有解决上述许多问题。您可能希望在此基础上进一步开发。您还可以查看其他语言中的现有框架以寻找灵感。


    (*) 我认为这不是问题,相反我认为这正是C++继承自C语言的精神所在。


    "This answer" 链接到问题,我没有找到您所指的答案。 - 463035818_is_not_a_number

    13
    Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:
    

    function_pool.h

    #pragma once
    #include <queue>
    #include <functional>
    #include <mutex>
    #include <condition_variable>
    #include <atomic>
    #include <cassert>
    
    class Function_pool
    {
    
    private:
        std::queue<std::function<void()>> m_function_queue;
        std::mutex m_lock;
        std::condition_variable m_data_condition;
        std::atomic<bool> m_accept_functions;
    
    public:
    
        Function_pool();
        ~Function_pool();
        void push(std::function<void()> func);
        void done();
        void infinite_loop_func();
    };
    

    function_pool.cpp

    #include "function_pool.h"
    
    Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
    {
    }
    
    Function_pool::~Function_pool()
    {
    }
    
    void Function_pool::push(std::function<void()> func)
    {
        std::unique_lock<std::mutex> lock(m_lock);
        m_function_queue.push(func);
        // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
        lock.unlock();
        m_data_condition.notify_one();
    }
    
    void Function_pool::done()
    {
        std::unique_lock<std::mutex> lock(m_lock);
        m_accept_functions = false;
        lock.unlock();
        // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
        m_data_condition.notify_all();
        //notify all waiting threads.
    }
    
    void Function_pool::infinite_loop_func()
    {
        std::function<void()> func;
        while (true)
        {
            {
                std::unique_lock<std::mutex> lock(m_lock);
                m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
                if (!m_accept_functions && m_function_queue.empty())
                {
                    //lock will be release automatically.
                    //finish the thread loop and let it join in the main thread.
                    return;
                }
                func = m_function_queue.front();
                m_function_queue.pop();
                //release the lock
            }
            func();
        }
    }
    

    主程序.cpp

    #include "function_pool.h"
    #include <string>
    #include <iostream>
    #include <mutex>
    #include <functional>
    #include <thread>
    #include <vector>
    
    Function_pool func_pool;
    
    class quit_worker_exception : public std::exception {};
    
    void example_function()
    {
        std::cout << "bla" << std::endl;
    }
    
    int main()
    {
        std::cout << "stating operation" << std::endl;
        int num_threads = std::thread::hardware_concurrency();
        std::cout << "number of threads = " << num_threads << std::endl;
        std::vector<std::thread> thread_pool;
        for (int i = 0; i < num_threads; i++)
        {
            thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
        }
    
        //here we should send our functions
        for (int i = 0; i < 50; i++)
        {
            func_pool.push(example_function);
        }
        func_pool.done();
        for (unsigned int i = 0; i < thread_pool.size(); i++)
        {
            thread_pool.at(i).join();
        }
    }
    

    3
    谢谢!这真的帮助我开始并行线程操作。最终,我使用了稍微修改过的你的实现版本。 - Robbie Capps
    2
    您不需要将 m_accept_functions 设为原子类型。可以通过互斥锁来保护 m_accept_functions - dmikos
    我们可以调用join()是很好的。 - moi

    11

    您可以使用来自boost库的thread_pool

    void my_task(){...}
    
    int main(){
        int threadNumbers = thread::hardware_concurrency();
        boost::asio::thread_pool pool(threadNumbers);
    
        // Submit a function to the pool.
        boost::asio::post(pool, my_task);
    
        // Submit a lambda object to the pool.
        boost::asio::post(pool, []() {
          ...
        });
    }
    
    你也可以使用来自开源社区的线程池
    void first_task() {...}    
    void second_task() {...}
    
    int main(){
        int threadNumbers = thread::hardware_concurrency();
        pool tp(threadNumbers);
    
        // Add some tasks to the pool.
        tp.schedule(&first_task);
        tp.schedule(&second_task);
    }
    

    4

    类似这样的东西可能会有所帮助(来自一个工作中的应用程序)。

    #include <memory>
    #include <boost/asio.hpp>
    #include <boost/thread.hpp>
    
    struct thread_pool {
      typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
    
      thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
        for (int i = 0; i < threads; ++i) {
          auto worker = [this] { return service.run(); };
          grp.add_thread(new boost::thread(worker));
        }
      }
    
      template<class F>
      void enqueue(F f) {
        service.post(f);
      }
    
      ~thread_pool() {
        service_worker.reset();
        grp.join_all();
        service.stop();
      }
    
    private:
      boost::asio::io_service service;
      asio_worker service_worker;
      boost::thread_group grp;
    };
    

    您可以像这样使用它:
    thread_pool pool(2);
    
    pool.enqueue([] {
      std::cout << "Hello from Task 1\n";
    });
    
    pool.enqueue([] {
      std::cout << "Hello from Task 2\n";
    });
    

    请记住,重新发明一个高效的异步队列机制并不容易。

    Boost::asio::io_service 是一个非常高效的实现,实际上它是一组特定于平台的包装器(例如,在 Windows 上它使用了 I/O 完成端口)。


    2
    C++11真的需要那么多的boost库吗?使用std::thread不就足够了吗? - einpoklum
    std中没有与boost::thread_group相对应的东西。boost::thread_groupboost::thread实例的集合。但是,用std::threadvector很容易替换boost::thread_group - rustyx

    4

    编辑:现在需要C++17和概念。(截至2016年9月12日,只有g++ 6.0+是足够的。)

    由于此功能,模板推断更加准确,所以值得努力获取更新的编译器。我还没有找到需要显式模板参数的函数。

    它现在也可以接受任何适当的可调用对象(并且仍然是静态类型安全的!!!)。

    它还包括一个可选的绿色线程优先级线程池,使用相同的API。但是这个类只支持POSIX。它使用ucontext_t API用于用户空间任务切换。


    我为此创建了一个简单的库。下面给出了使用示例。(我回答这个问题是因为在决定自己编写之前,这是我发现的其中一件事情。)

    bool is_prime(int n){
      // Determine if n is prime.
    }
    
    int main(){
      thread_pool pool(8); // 8 threads
    
      list<future<bool>> results;
      for(int n = 2;n < 10000;n++){
        // Submit a job to the pool.
        results.emplace_back(pool.async(is_prime, n));
      }
    
      int n = 2;
      for(auto i = results.begin();i != results.end();i++, n++){
        // i is an iterator pointing to a future representing the result of is_prime(n)
        cout << n << " ";
        bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
        if(prime)
          cout << "is prime";
        else
          cout << "is not prime";
        cout << endl;
      }  
    }
    

    您可以传递任何带有任何(或无)返回值和任何(或无)参数的函数给 async,它将返回相应的 std::future。要获取结果(或仅等待任务完成),请在该 future 上调用 get()

    这是Github链接: https://github.com/Tyler-Hardin/thread_pool.


    1
    看起来很棒,但是与vit-vit的标题进行比较会更好! - Jonathan H
    1
    @Sh3ljohn,从快速浏览来看,它们在API方面基本相同。vit-vit使用boost的无锁队列,比我的更好。(但是我的目标是仅使用std::*来实现。我想我可以自己实现无锁队列,但这听起来很困难和容易出错。)此外,vit-vit的没有相关的.cpp文件,对于不知道自己在做什么的人来说更简单。(例如https://github.com/Tyler-Hardin/thread_pool/issues/1) - Tyler
    他/她还有一个仅使用STL的解决方案,我已经在分叉它几个小时了。起初,它看起来比你的解决方案更复杂,到处都是共享指针,但实际上这是为了正确处理热重置而需要的。 - Jonathan H
    @Sh3ljohn,啊,我没有注意到热重置。那很好。我选择不担心它,因为它不是预期用例中的真正问题。(个人而言,我想不出一个需要调整大小的情况,但这可能是由于缺乏想象力。) - Tyler
    1
    示例用例:您正在服务器上运行一个RESTful API,并且需要在维护期间暂时减少资源分配,而无需完全关闭服务。 - Jonathan H

    1

    看起来线程池是一个非常流行的问题/练习 :-)

    我最近用现代C++写了一个,它是我的财产,并且公开可用于此处 - https://github.com/yurir-dev/threadpool

    它支持模板化返回值、核心钉住、一些任务的排序。所有实现都在两个 .h 文件中。

    因此,原始问题将会是这样的:

    #include "tp/threadpool.h"
    
    int arr[5] = { 0 };
    
    concurency::threadPool<void> tp;
    tp.start(std::thread::hardware_concurrency());
    
    std::vector<std::future<void>> futures;
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            futures.push_back(tp.push([&arr, j]() {
                   arr[j] += 2;
                }));
        }
    }
    
    // wait until all pushed tasks are finished.
    for (auto& f : futures)
        f.get();
    // or just tp.end(); // will kill all the threads
    
    arr[4] = *std::min_element(arr, arr + 4);
    

    在引用自己的内容时,请务必阅读Stack Overflow的自我推广政策 - Jeremy Caney
    1
    @JeremyCaney有什么问题吗?他并没有销售任何东西,只是展示他公开可用的FOSS库。 - original.roland
    如果您对自我推广规则有疑问,我建议您在Meta Stack Exchange上提出。 - Jeremy Caney
    @JeremyCaney,我对自我推广规则没有疑问,完全支持,只是看不出这个答案会违反任何规则。或者你只是想随意提醒yurir阅读政策吗? - original.roland
    1
    @original.roland:至少,在引用自己的内容时,他们应该承认这是他们自己的内容。在这种情况下,这并不是特别大的问题,也很容易解决,这就是为什么我提醒他们遵守政策而没有标记答案的原因。我的假设只是他们不知道这个政策。然而,他们应该编辑他们的答案以承认他们是链接存储库的所有者。 - Jeremy Caney
    显示剩余2条评论

    1

    您可以使用单头文件库task-thread-pool,然后您的代码就变成了:

    #include <algorithm> // for std::min_element
    #include <task_thread_pool.hpp>
    
    int main () {
        task_thread_pool::task_thread_pool pool;
    
        int arr[5] = {0};  // not arr[4] because original code had an out-of-bounds error accessing arr[4].
    
        for (int i = 0; i < 8; ++i) { // for 8 iterations,
            for (int j = 0; j < 4; ++j) {
                pool.submit_detach([&arr, j]{ arr[j] += 2; });
            }
    
            // Wait for all tasks to complete.
            // Could also use submit() which returns a future,
            // but then we'd have to call get() on all those futures.
            pool.wait_for_tasks();
    
            arr[4] = *std::min_element(arr, arr + 4);
        }
    
        return 0;
    }
    

    这将创建并重复使用线程,直到所有任务完成。

    适用于C++11及更高版本。


    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接