Windows API线程池简单示例

8
InterlockedIncrement vs EnterCriticalSection/counter++/LeaveCriticalSection 的回答,问题已经得到解决,下面的代码正在正常工作。这应该提供了一个有趣的 Windows 线程池使用的简单示例]

我找不到以下任务的简单示例。例如,我的程序需要将一个巨大的 std::vector 中的值增加 1,因此我希望并行进行。它在程序的生命周期中需要多次执行该操作。我知道如何在每次调用例程时使用 CreateThread 进行操作,但我无法使用 ThreadPool 摆脱 CreateThread。

这是我的做法:

class Thread {
public:
    Thread(){}
    virtual void run() = 0 ; // I can inherit an "IncrementVectorThread"
};
class IncrementVectorThread: public Thread {
public:
   IncrementVectorThread(int threadID, int nbThreads, std::vector<int> &vec) : id(threadID), nb(nbThreads), myvec(vec) { };

   virtual void run() {
        for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++)
          myvec[i]++; //and let's assume myvec is properly sized
    }
   int id, nb;
   std::vector<int> &myvec;
};

class ThreadGroup : public std::vector<Thread*> {
public:
    ThreadGroup() { 
         pool = CreateThreadpool(NULL);
         InitializeThreadpoolEnvironment(&cbe);
         cleanupGroup = CreateThreadpoolCleanupGroup();
         SetThreadpoolCallbackPool(&cbe, pool);
         SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
         threadCount = 0;
    }
    ~ThreadGroup() {
         CloseThreadpool(pool);
}
    PTP_POOL pool;
    TP_CALLBACK_ENVIRON cbe;
    PTP_CLEANUP_GROUP cleanupGroup;
    volatile long threadCount;
} ;


static VOID CALLBACK runFunc(
                PTP_CALLBACK_INSTANCE Instance,
                PVOID Context,
                PTP_WORK Work) {

   ThreadGroup &thread = *((ThreadGroup*) Context);
   long id = InterlockedIncrement(&(thread.threadCount));
   DWORD tid = (id-1)%thread.size();
   thread[tid]->run();
}

void run_threads(ThreadGroup* thread_group) {
    SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size());
    SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size());

    TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe);
    thread_group->threadCount = 0;
    for (int i=0; i<thread_group->size(); i++) {
        SubmitThreadpoolWork(worker);
     }  
     WaitForThreadpoolWorkCallbacks(worker,FALSE);  
     CloseThreadpoolWork(worker);   
}       

void main() {

   ThreadGroup group;
   std::vector<int> vec(10000, 0);
   for (int i=0; i<10; i++)
      group.push_back(new IncrementVectorThread(i, 10, vec));

   run_threads(&group);
   run_threads(&group);
   run_threads(&group);

   // now, vec should be == std::vector<int>(10000, 3);       
}

所以,如果我理解正确:
- CreateThreadpool命令创建了一组线程(因此,调用CreateThreadpoolWork是廉价的,因为它不会调用CreateThread)
- 我可以拥有任意数量的线程池(如果我想为“IncrementVector”和“DecrementVector”线程分别创建一个线程池,我可以这样做)。
- 如果我需要将我的“增量向量”任务分成10个线程,而不是调用10次CreateThread,我可以创建一个单独的“worker”,并将其使用相同的参数10次提交到ThreadPool中(因此,在回调中需要线程ID来知道要递增的std::vector的哪个部分)。在这里,我找不到线程ID,因为GetCurrentThreadId()函数返回线程的实际ID(即类似于1528的东西,而不是0..nb_launched_threads之间的东西)。

最后,我不确定我是否正确理解了这个概念:如果我将我的std::vector分成10个线程,我真的需要一个单独的worker而不是10个吗?

谢谢!

1个回答

7

你的理解基本正确,只有最后一点需要更正。

线程池的整个思想是,你不需要在意它有多少个线程。只需要将大量的任务投入线程池中,让操作系统决定如何执行每个任务块。 因此,如果你创建并提交了10个任务块,操作系统可以从线程池中使用1到10个线程。

你不需要关心这些线程的身份。不要费心去处理线程ID、线程数量的最小值或最大值等信息。

如果你不关心线程的身份,那么如何管理要更改的向量部分呢?很简单,在创建线程池之前,将计数器初始化为零。在回调函数中,调用InterlockedIncrement函数来检索并增加计数器。对于每个提交的工作项,你将获得一个连续的整数。


谢谢 - 已点赞!我按照你的建议做了,但是用上面的简单代码,最终向量中仍然没有得到正确的值 :s - nbonneel
答案重新验证:计数器没有在正确的位置递增 :) 谢谢! - nbonneel
看起来 InterlockedIncrement 没有正确锁定计数器 :s 我尝试了使用 long long 变量的 InterlockedIncrement64,以及带有 _aligned_malloc 对齐内存分配的 InterlockedIncrementAcquire/Release,但我仍然遇到了[可能是]并发问题。 - nbonneel
我得到的整数并不总是严格连续的。 - nbonneel
2
您正在错误地使用 InterlockedIncrement。请参考其他问题以获取解释。 - Raymond Chen
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接