C++11 动态线程池

9

最近,我一直在寻找一个用于线程并发任务的库。理想情况下,它应该有一个简单的接口,可以在线程上调用函数。任何时候都有n个线程,有些线程比其他线程更快地完成任务并在不同的时间到达。

起初,我尝试使用Rx,它在c++中非常好用。我还研究了Blocks和TBB,但它们都是平台相关的。对于我的原型,我需要保持独立于平台,因为我们还不知道它将在什么上运行,并且当做出决定时可能会更改。

C++11有许多关于线程和并发的内容,我找到了很多示例,比如这个线程池的示例:

https://github.com/bilash/threadpool

类似的项目使用相同的lambda表达式和std::thread以及std::mutex。

这看起来非常适合我所需的。但是也存在一些问题。线程池是由定义数量的线程启动的,任务被排队,直到线程空闲。

如何添加新线程?删除过期线程?(.Join()??)

显然,对于已知数量的线程来说,这要容易得多,因为它们可以在构造函数中初始化,然后在析构函数中加入(join())。

对于有C++并发经验的人,有什么技巧或指针可以提供吗?


GCD 或 libdispatch http://en.wikipedia.org/wiki/Grand_Central_Dispatch - Bryan Chen
1
我使用Boost :: Asio来创建一个简单的线程池。也许你想看一下这个示例https://dev59.com/xmYq5IYBdhLWcg3wtCvO - Haatschii
是的。我已经了解了GCD。有关C++中的示例或教程资源吗?Boost在各个平台上都受支持吗? - WebSight
我从未在使用Boost时遇到过任何移植问题。如果我们谈论的是WIN/Linux/OSX,那么完全没有问题。 - Haatschii
2个回答

10
  1. Start with maximum number of threads a system can support:

    int Num_Threads =  thread::hardware_concurrency();
    
  2. For an efficient threadpool implementation, once threads are created according to Num_Threads, it's better not to create new ones, or destroy old ones (by joining). There will be performance penalty, might even make your application goes slower than the serial version.

    Each C++11 thread should be running in their function with an infinite loop, constantly waiting for new tasks to grab and run.

    Here is how to attach such function to the thread pool:

    int Num_Threads = thread::hardware_concurrency();
    vector<thread> Pool;
    for(int ii = 0; ii < Num_Threads; ii++)
    {  Pool.push_back(thread(Infinite_loop_function));}
    
  3. The Infinite_loop_function

    This is a "while(true)" loop waiting for the task queue

    void The_Pool:: Infinite_loop_function()
    {
        while(true)
        {
            {
                unique_lock<mutex> lock(Queue_Mutex);
    
                condition.wait(lock, []{return !Queue.empty()});
                Job = Queue.front();
                Queue.pop();
            }
            Job(); // function<void()> type
        }
    };
    
  4. Make a function to add job to your Queue

    void The_Pool:: Add_Job(function<void()> New_Job)
    {
        {
            unique_lock<mutex> lock(Queue_Mutex);
            Queue.push(New_Job);
        }
        condition.notify_one();
    }
    
  5. Bind an arbitrary function to your Queue

    Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));
    

一旦您集成了这些元素,就拥有了自己的动态线程池。这些线程始终运行,等待任务。


0
这应该很容易使用:https://pocoproject.org/docs/Poco.ThreadPool.html
线程池始终保持一定数量的线程运行,随时准备接受工作。创建和启动线程可能会对应用程序造成显著的运行时开销。线程池通过减少必须创建(和再次销毁)的线程数量来提高应用程序的性能。线程池中的线程在再次可用时被重复使用。线程池始终保持最小数量的线程运行。如果对线程的需求增加,则会创建额外的线程。一旦对线程的需求再次下降,不再使用的线程将停止并从池中删除。
ThreadPool(
    int minCapacity = 2,
    int maxCapacity = 16,
    int idleTime = 60,
    int stackSize = 0
);

这是一个非常好用的库,易于使用,不像 Boost :(

https://github.com/pocoproject/poco


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接