问题如下:我有大约1000个循环,每个循环中有大约10,000次迭代。这些循环必须按顺序执行,但我有4个可用的CPU。 我尝试将10,000次迭代循环分为4个2,500次迭代循环,即每个线程一个循环。 但在继续下一个“大”迭代之前,必须等待这4个小循环完成。这意味着我不能捆绑作业。
我的问题是使用线程池和4个线程比顺序执行作业要慢得多(使用单独线程执行一个循环比在主线程中顺序执行要慢得多)。
我在Windows上工作,因此使用CreateEvent()创建事件,然后使用WaitForMultipleObjects(2, handles, false, INFINITE)等待其中一个事件,直到主线程调用SetEvent()。
看来整个事件处理过程(以及使用关键部分进行线程同步)非常昂贵!
我的问题是:使用事件需要很长时间,这正常吗?如果是这样,是否有另一种机制可以使用,并且开销更小?
以下是一些代码以说明(从我的线程池类中复制了一些相关部分):
// thread function
unsigned __stdcall ThreadPool::threadFunction(void* params) {
// some housekeeping
HANDLE signals[2];
signals[0] = waitSignal;
signals[1] = endSignal;
do {
// wait for one of the signals
waitResult = WaitForMultipleObjects(2, signals, false, INFINITE);
// try to get the next job parameters;
if (tp->getNextJob(threadId, data)) {
// execute job
void* output = jobFunc(data.params);
// tell thread pool that we're done and collect output
tp->collectOutput(data.ID, output);
}
tp->threadDone(threadId);
}
while (waitResult - WAIT_OBJECT_0 == 0);
// if we reach this point, endSignal was sent, so we are done !
return 0;
}
// create all threads
for (int i = 0; i < nbThreads; ++i) {
threadData data;
unsigned int threadId = 0;
char eventName[20];
sprintf_s(eventName, 20, "WaitSignal_%d", i);
data.handle = (HANDLE) _beginthreadex(NULL, 0, ThreadPool::threadFunction,
this, CREATE_SUSPENDED, &threadId);
data.threadId = threadId;
data.busy = false;
data.waitSignal = CreateEvent(NULL, true, false, eventName);
this->threads[threadId] = data;
// start thread
ResumeThread(data.handle);
}
// add job
void ThreadPool::addJob(int jobId, void* params) {
// housekeeping
EnterCriticalSection(&(this->mutex));
// first, insert parameters in the list
this->jobs.push_back(job);
// then, find the first free thread and wake it
for (it = this->threads.begin(); it != this->threads.end(); ++it) {
thread = (threadData) it->second;
if (!thread.busy) {
this->threads[thread.threadId].busy = true;
++(this->nbActiveThreads);
// wake thread such that it gets the next params and runs them
SetEvent(thread.waitSignal);
break;
}
}
LeaveCriticalSection(&(this->mutex));
}