在C++中循环中重复使用线程

33
我需要在C++程序中并行处理一些任务,但我对并行编程完全不熟悉。目前通过互联网搜索已经有了一些进展,但现在卡住了。我想在循环中重用一些线程,但显然不知道如何实现我想要的功能。
我正在从计算机上的两个ADC卡获取数据(并行获取),然后需要对收集到的数据执行一些操作(并行处理),同时收集下一批数据。以下是一些伪代码以说明问题。
//Acquire some data, wait for all the data to be acquired before proceeding
std::thread acq1(AcquireData, boardHandle1, memoryAddress1a);
std::thread acq2(AcquireData, boardHandle2, memoryAddress2a);
acq1.join();
acq2.join();

while(user doesn't interrupt)
{

//Process first batch of data while acquiring new data
std::thread proc1(ProcessData,memoryAddress1a);
std::thread proc2(ProcessData,memoryAddress2a);
acq1(AcquireData, boardHandle1, memoryAddress1b);
acq2(AcquireData, boardHandle2, memoryAddress2b);
acq1.join();
acq2.join();
proc1.join();
proc2.join();
/*Proceed in this manner, alternating which memory address 
is written to and being processed until the user interrupts the program.*/
}

那就是它的主旨。循环的下一次运行将在处理“b”数据时写入“a”内存地址,并继续交替进行(我可以让代码这样做,只是为了防止问题混乱而将其删除)。
无论如何,问题(我相信有些人已经可以看出来了)是第二次尝试使用acq1和acq2时,编译器(VS2012)会提示“IntelliSense:调用类类型的对象时缺少适当的operator()或转换函数为指向函数类型的指针”。同样,如果我再次在acq1和acq2前面放置std::thread,它会说“错误C2374:'acq1':重新定义;多重初始化”。
因此,问题是,当线程完成其先前的任务后,我是否可以将线程重新分配给新任务?我总是等待线程的上一个使用结束,然后再次调用它,但我不知道如何重新分配线程,并且由于它在一个循环中,我不能每次都创建一个新线程(或者如果我可以,那似乎是浪费和不必要的,但我可能是错的)。
提前感谢您的回答。
6个回答

57
最简单的方法是使用一个可等待队列(waitable queue),其中包含std::function对象。就像这样:


#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <chrono>


class ThreadPool
{
    public:

    ThreadPool (int threads) : shutdown_ (false)
    {
        // Create the specified number of threads
        threads_.reserve (threads);
        for (int i = 0; i < threads; ++i)
            threads_.emplace_back (std::bind (&ThreadPool::threadEntry, this, i));
    }

    ~ThreadPool ()
    {
        {
            // Unblock any threads and tell them to stop
            std::unique_lock <std::mutex> l (lock_);

            shutdown_ = true;
            condVar_.notify_all();
        }

        // Wait for all threads to stop
        std::cerr << "Joining threads" << std::endl;
        for (auto& thread : threads_)
            thread.join();
    }

    void doJob (std::function <void (void)> func)
    {
        // Place a job on the queu and unblock a thread
        std::unique_lock <std::mutex> l (lock_);

        jobs_.emplace (std::move (func));
        condVar_.notify_one();
    }

    protected:

    void threadEntry (int i)
    {
        std::function <void (void)> job;

        while (1)
        {
            {
                std::unique_lock <std::mutex> l (lock_);

                while (! shutdown_ && jobs_.empty())
                    condVar_.wait (l);

                if (jobs_.empty ())
                {
                    // No jobs to do and we are shutting down
                    std::cerr << "Thread " << i << " terminates" << std::endl;
                    return;
                 }

                std::cerr << "Thread " << i << " does a job" << std::endl;
                job = std::move (jobs_.front ());
                jobs_.pop();
            }

            // Do the job without holding any locks
            job ();
        }

    }

    std::mutex lock_;
    std::condition_variable condVar_;
    bool shutdown_;
    std::queue <std::function <void (void)>> jobs_;
    std::vector <std::thread> threads_;
};

void silly (int n)
{
    // A silly job for demonstration purposes
    std::cerr << "Sleeping for " << n << " seconds" << std::endl;
    std::this_thread::sleep_for (std::chrono::seconds (n));
}

int main()
{
    // Create two threads
    ThreadPool p (2);

    // Assign them 4 jobs
    p.doJob (std::bind (silly, 1));
    p.doJob (std::bind (silly, 2));
    p.doJob (std::bind (silly, 3));
    p.doJob (std::bind (silly, 4));
}

2
简洁明了的KISS解决方案,没有多余的修饰。 - Surt
这更像是一个问题,而不仅仅是一个问题,但是在你的doJob方法中,当你调用condVar_.notify_one()时,锁会在之前(或者在之前)被释放吗?我问这个问题的原因是因为我的直觉告诉我,可能会在通知的线程唤醒之前,尝试获取锁并在执行doJob的线程实际释放锁之前失败。 - David
1
@David 恰恰相反,这将是一个巨大的性能优势。调度程序不会安排想要争夺锁的线程,而是会安排其他可以在没有任何争用的情况下运行的线程。只有当系统没有有用的任务可执行时才会有性能损失,在这种情况下,我们并不真的关心性能。 - David Schwartz
1
@AndrewPilikin 遵循您的编译器指令来编译使用pthread的代码。也许需要加上“-pthread”参数? - David Schwartz
1
@DrPhil,因为“shutdown_”只是一个普通的“bool”,所以不会有任何区别。 - David Schwartz
显示剩余14条评论

20
std::thread类的设计初衷是执行一个任务(在构造函数中给定),然后结束。如果您想做更多的工作,就需要一个新线程。从C++11开始,这就是我们拥有的全部内容。线程池没有进入标准。(我不确定C++14对它们有什么要说的。)
幸运的是,您可以很容易地自己实现所需的逻辑。这里是大致的图片:
- 启动执行以下操作的n个工作线程: - 只要还有更多的工作要做,就重复以下步骤: - 获取下一个任务t(可能会等待直到有任务可用)。 - 处理任务t。 - 在处理队列中插入新任务。 - 告诉工作线程没有更多任务要做。 - 等待工作线程完成。
最困难的部分(仍然相当容易)是正确地设计工作队列。通常,同步链接列表(来自STL)将为此做到。同步意味着希望操作队列的任何线程在获取std::mutex之后才能这样做,以避免竞争条件。如果工作线程发现列表为空,则必须等待直到再次有工作。您可以使用std::condition_variable进行此操作。每次将新任务插入队列时,插入线程都会通知等待在条件变量上的线程,因此将停止阻止并最终开始处理新任务。
第二个不太复杂的部分是如何向工作线程发出没有更多工作要做的信号。显然,您可以设置一些全局标志,但如果一个工作线程被阻塞等待在队列中,它不会很快意识到。一种解决方案可能是notify_all()线程,并在每次被通知时检查标志。另一个选择是将某些不同的“有毒”项目插入队列中。如果一个工作线程遇到这个项目,它就会退出。
使用自定义的task对象或简单的lambda表示任务队列是直接的。
以上全部都是C++11功能。如果您陷入早期版本的困境,则需要求助于为特定平台提供多线程的第三方库。
虽然这些都不是火箭科学,但第一次很容易出错。不幸的是,与并发相关的错误是最难调试的。首先花几个小时阅读好书的相关章节或通过教程进行培训可以很快得到回报。

谢谢你的回答,非常详细和写得很好。如果下面luk32的回复对我不起作用,我会把这个作为备选方案B保存下来。你的方法可能是正确的做法。 - notaCSmajor

0

这个

 std::thread acq1(...)

是调用构造函数的语句。它正在创建一个名为acq1的新对象。

这个

  acq1(...)

对现有对象 aqc1 应用 () 操作符。如果没有为 std::thread 定义这样的操作符,则编译器会报错。

据我所知,您可能无法重复使用 std::threads。您构造和启动它们。与它们连接并将其丢弃。


谢谢Oncaphillis,结合你的回答和luk32的,我想我意识到了我的语法错误在哪里。 - notaCSmajor

0

这要看你是否认为移动是重新分配还是不重新分配。你可以移动一个线程,但不能复制它。

下面的代码将在每次迭代中创建新的一对线程,并将它们移动到旧线程的位置。我想这应该可以工作,因为新的thread对象将是临时的。

while(user doesn't interrupt)
{
//Process first batch of data while acquiring new data
std::thread proc1(ProcessData,memoryAddress1a);
std::thread proc2(ProcessData,memoryAddress2a);
acq1 = std::thread(AcquireData, boardHandle1, memoryAddress1b);
acq2 = std::thread(AcquireData, boardHandle2, memoryAddress2b);
acq1.join();
acq2.join();
proc1.join();
proc2.join();
/*Proceed in this manner, alternating which memory address 
is written to and being processed until the user interrupts the program.*/
}

发生的情况是,对象实际上并没有在迭代结束时结束其生命周期,因为它在循环中相对于外部作用域声明。但每次都会创建一个新对象并进行移动。我不知道有什么可以节省的(可能是我太蠢了),所以我想这与在循环内声明acq并简单地重用符号完全相同。总之...是关于如何分类创建临时和move的问题。
此外,这显然会在每个循环中启动一个新线程(当然会结束先前分配的线程),它不会使线程等待新数据并神奇地将其馈送到处理管道中。您需要像这样实现它:工作线程池和通过队列进行通信。
参考文献:operator=, (ctor)
我认为您得到的错误是不言自明的,所以我会跳过解释它们。

谢谢,我尝试了一下,看起来它能够按照需要工作。时间会告诉我们是否存在任何问题,但目前它已经极大地帮助了我! - notaCSmajor

-1

我认为你需要一个更简单的答案来多次运行一组线程,这是最好的解决方案:

do{

    std::vector<std::thread> thread_vector;

     for (int i=0;i<nworkers;i++)
     {
       thread_vector.push_back(std::thread(yourFunction,Parameter1,Parameter2, ...));
    }

    for(std::thread& it: thread_vector)
    { 
      it.join();
    }
   q++;
} while(q<NTIMES);

-1

你也可以创建自己的线程类,并调用它的run方法,例如:

class MyThread
{
public:
void run(std::function<void()> func) {
   thread_ = std::thread(func);
}
void join() {
   if(thread_.joinable())
      thread_.join();
}
private:
   std::thread thread_;
};

// Application code...
MyThread myThread;
myThread.run(AcquireData);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接