工作线程队列中最轻量级的同步原语

8

我即将实现一个带有工作项队列的工作线程,当我在思考这个问题时,我想知道我是否在做最好的事情。

所谓的线程将需要一些线程本地数据(在构造时预初始化),并且将循环处理工作项,直到满足某些条件为止。

伪代码:

volatile bool run = true;

int WorkerThread(param)
{
    localclassinstance c1 = new c1();
    [other initialization]

    while(true) {
        [LOCK]
        [unqueue work item]
        [UNLOCK]
        if([hasWorkItem]) {
            [process data]
            [PostMessage with pointer to data]
        }
        [Sleep]

        if(!run)
            break;
    }

    [uninitialize]
    return 0;
}

我想我会通过关键部分进行锁定,因为队列将是std::vector或std::queue,但也许有更好的方法。使用Sleep的部分看起来不太好,因为会有很多额外的Sleep和大Sleep值,或者在Sleep值很小的时候会有很多额外的锁定,这是绝对不必要的。但是,我想不到一个可以替代关键部分的WaitForSingleObject友好原语,因为可能会有两个线程同时排队工作项。因此,Event似乎是最佳候选,如果Event已经设置,则可能会丢失第二个工作项,并且它不能保证互斥。也许甚至有更好的方法,使用InterlockedExchange之类的函数,可以导致更少的序列化。附注:在取消排队阶段,我可能需要预处理整个队列并丢弃过时的工作项。

如果您的锁定和解锁不昂贵,那么是否会影响睡眠? - DumbCoder
串行化3或4个线程,烧掉CPU并可能遇到锁定交错不是一个很好的事情。如果我理解正确,没有Sleep或WaitForSingleObject会发生这种情况。但是,也许我错了。 - Coder
工作项是如何被传递到队列中的? - John Dibling
UI线程 -> [锁定], [插入项目], [解锁], [信号] <- 我猜 - Coder
不要自己编写代码,考虑封装Win32线程池API或使用更高级别的框架 :) - snemarch
我很想这样做,但是我必须在此线程上拥有持久的COM对象,因此需要CoCinitialize/CoUninitialize,这要求常规线程。 - Coder
9个回答

5
有多种方法可以实现这个功能。
一种选择是使用信号量来等待。每当将一个值推入队列时,信号量就会被触发,因此工作线程只会在队列中没有任何项时才会阻塞。这仍然需要对队列本身进行单独的同步。
第二种选择是使用手动重置事件,当队列中有项目时设置它,当队列为空时清除它。同样,您需要对队列进行单独的同步。
第三种选择是在线程上创建一个不可见的消息窗口,并使用特殊的 WM_USERWM_APP 消息将项目发布到队列中,通过指针将项目附加到消息中。
另一种选择是使用条件变量。原生的 Windows 条件变量只适用于 Windows Vista 或 Windows 7,但条件变量也可以在 Windows XP 上使用 Boost 或 C++0x 线程库的实现。我的博客上提供了使用 Boost 条件变量的示例队列:http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

事件解决方案似乎是最好的选择,但不幸的是,由于COM的原因,我需要进行更多的学习和重构。CoInitialize似乎与WaitForSingleObject不兼容。条件变量看起来很不错,但是在这个项目中我没有Boost依赖,并且我必须支持WinXp。 - Coder
如果我没记错的话,COM需要一个消息泵,因此请改用MsgWaitForMultipleObjects,并在收到任何窗口消息时进行消息泵处理。 - Anthony Williams
如果COM是必需的,那么你最好只是发布线程消息,或者使用MESSAGE_HWND类型的窗口作为目标,因为Windows消息泵机制已经在该线程上实例化了。 - Chris Becke
1
是的。具有HWND_MESSAGE父级的仅消息窗口比线程消息好得多,因为除非队列溢出,否则它们不会丢失,而线程消息如果在顶层消息循环之外的任何地方处理消息,则可能会丢失。 - Anthony Williams

3

如果您的情境符合特定要求,完全可以在不使用阻塞锁的情况下在线程之间共享资源。

您需要一个原子指针交换基元,例如Win32的InterlockedExchange。大多数处理器架构都提供某种形式的原子交换,通常比获取正式锁的成本低得多。

您可以将工作项队列存储在可由所有感兴趣的线程访问的指针变量中(全局变量或所有线程都可以访问的对象字段)。

此情境假设所涉及的线程始终有事可做,并且只偶尔“瞥一眼”共享资源。如果您想要一个线程阻塞等待输入的设计,请使用传统的阻塞事件对象。

在任何事情开始之前,请创建您的队列或工作项列表对象并将其分配给共享指针变量。

现在,当生产者想要将某些东西推入队列时,它们通过使用InterlockedExchange将null交换到共享指针变量来“获取”对队列对象的独占访问权限。如果交换的结果返回null,则表示其他人正在修改队列对象。Sleep(0)释放线程的剩余时间片,然后循环重试交换,直到它返回非null。即使您最终循环了几次,这也比调用内核以获取互斥对象快得多。内核调用需要数百个时钟周期才能转换为内核模式。
成功获取指针后,对队列进行修改,然后将队列指针重新交换回共享指针。
从队列中消耗项目时,您也是一样的:将null交换到共享指针并循环,直到获得非null结果,在本地变量中操作对象,然后将其重新交换到共享指针变量中。
这种技术是原子交换和简短自旋循环的组合。它在线程不被阻塞且冲突很少的情况下运行良好。大多数情况下,第一次交换就会给你对共享对象的独占访问权,只要任何线程独占队列对象的时间非常短,则没有线程需要循环多次,直到队列对象再次可用。
如果您预计在您的场景中线程之间存在大量争用,或者您希望设计使线程大部分时间都被阻塞等待工作到达,则最好使用正式的互斥同步对象。

2
最快的锁定原语通常是自旋锁或自旋睡眠锁。CRITICAL_SECTION就是这样一个(用户空间)自旋睡眠锁。(当然,除了根本不使用锁定原语之外。但这意味着使用无锁数据结构,而这些真的很难正确实现。)
至于避免Sleep:看一下条件变量。它们被设计用于与“互斥锁”一起使用,我认为它们比Windows的EVENTs更容易正确使用。
Boost.Thread有一个很好的可移植实现,包括快速的用户空间自旋睡眠锁和条件变量:

http://www.boost.org/doc/libs/1_44_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref

一个使用Boost.Thread的工作队列可能看起来像这样:
template <class T>
class Queue : private boost::noncopyable
{
public:
    void Enqueue(T const& t)
    {
        unique_lock lock(m_mutex);

        // wait until the queue is not full
        while (m_backingStore.size() >= m_maxSize)
            m_queueNotFullCondition.wait(lock); // releases the lock temporarily

        m_backingStore.push_back(t);
        m_queueNotEmptyCondition.notify_all(); // notify waiters that the queue is not empty
    }

    T DequeueOrBlock()
    {
        unique_lock lock(m_mutex);

        // wait until the queue is not empty
        while (m_backingStore.empty())
            m_queueNotEmptyCondition.wait(lock); // releases the lock temporarily

        T t = m_backingStore.front();
        m_backingStore.pop_front();

        m_queueNotFullCondition.notify_all(); // notify waiters that the queue is not full

        return t;
    }

private:
    typedef boost::recursive_mutex mutex;
    typedef boost::unique_lock<boost::recursive_mutex> unique_lock;

    size_t const m_maxSize;

    mutex mutable m_mutex;
    boost::condition_variable_any m_queueNotEmptyCondition;
    boost::condition_variable_any m_queueNotFullCondition;

    std::deque<T> m_backingStore;
};

你知道有没有一份解释条件变量的教程吗?我还没有看到过一个条件变量的例子,展示了它们如何比Windows信号量或事件对象更好。当然,有一个相关的临界区对象,但如何利用它来实现某些东西对于未经训练的人来说完全不明显。 - Chris Becke

1

我会稍微重构一下:

WorkItem GetWorkItem()
{
    while(true)
    {
        WaitForSingleObject(queue.Ready);
        {
            ScopeLock lock(queue.Lock);
            if(!queue.IsEmpty())
            {
                return queue.GetItem();
            }
        }
    }
}

int WorkerThread(param) 
{ 
    bool done = false;
    do
    {
        WorkItem work  = GetWorkItem();
        if( work.IsQuitMessage() )
        {
            done = true;
        }
        else
        {
            work.Process();
        }
    } while(!done);

    return 0; 
} 

注意事项:

  1. ScopeLock 是一个 RAII 类,使关键部分的使用更加安全。
  2. 在事件上阻塞,直到工作项(可能)准备就绪 - 然后锁定并尝试出队它。
  3. 不要使用全局的“IsDone”标志,而是将特殊的退出消息 WorkItem 入队。

等待单个对象(queue.Ready); 这是等待事件吗?如果线程1安排了一个项目并发出信号,同时线程2也安排了一个项目并发出信号,那么工作线程将恢复,处理一个项目,重置事件并再次进入WaitForSingleObject,留下一个悬而未决的项目。 RAII和quitmessage是一个不错的想法。 - Coder
1
如果我的记忆没有出错的话,以下代码应该可以工作:使用手动事件重置,并在工作项数量降至零时在dequeue中进行重置。 - snemarch

1

有多种方法可以实现这个功能

首先,您可以创建一个名为“run”的事件,然后使用它来检测线程何时应该终止,主线程随后发出信号。而不是使用sleep,您将使用WaitForSingleObject和超时,这样您将直接退出而不是等待sleep毫秒。

另一种方法是在循环中接受消息,然后发明一个用户定义的消息,将其发布到线程中

编辑:根据情况,还可以明智地拥有另一个监视此线程的线程,以检查它是否已死亡,这可以通过上述提到的消息队列来完成,因此在x毫秒内回复某个消息意味着该线程没有锁定。


1

0

保持信令和同步分离。就像这样...

// in main thread

HANDLE events[2];
events[0] = CreateEvent(...); // for shutdown
events[1] = CreateEvent(...); // for work to do

// start thread and pass the events

// in worker thread

DWORD ret;
while (true)
{
   ret = WaitForMultipleObjects(2, events, FALSE, <timeout val or INFINITE>);

   if shutdown
      return
   else if do-work
      enter crit sec
      unqueue work
      leave crit sec
      etc.
   else if timeout
      do something else that has to be done
}

0
使用信号量而不是事件。

在这种情况下,我认为一个事件就足够了,因为它的本质是布尔值。 - AndersK

0

鉴于这个问题被标记为Windows,我会这样回答:

不要创建一个工作线程。您的工作线程任务可能是独立的,因此您可以同时处理多个任务?如果是这样:

  • 在主线程中调用CreateIOCompletionPort来创建一个io完成端口对象。
  • 创建一个工作线程池。您需要创建的数量取决于您可能想要并行服务的作业数量。 CPU核心数的某个倍数是一个好的开始。
  • 每次有作业进来时,调用PostQueuedCompletionStatus(),将作业结构体的指针作为lpOverlapped结构体传递。
  • 每个工作线程调用GetQueuedCompletionItem() - 从lpOverlapped指针检索工作项并在返回到GetQueuedCompletionStatus之前执行该作业。

这看起来很重,但io完成端口是在内核模式下实现的,并表示可以反序列化为与队列相关联的任何工作线程之一(即等待调用GetQueuedCompletionStatus)。 io完成端口知道正在处理项目的线程中有多少实际上正在使用CPU而不是阻塞在IO调用上 - 并将释放更多的工作线程从池中以确保达到并发计数。

所以,它不是轻量级的,但非常高效... I/O完成端口可以与管道和套接字句柄相关联,例如可以出列这些句柄上异步操作的结果。I/O完成端口设计可以扩展到在单个服务器上处理数万个套接字连接 - 但在桌面PC领域,它们非常方便地扩展了2或4核心上的作业处理。


1
基于IO完成端口的现代C++设计最大的问题在于,进程中的所有线程共享对单个堆的访问,并且STL往往会频繁地占用堆。因此,您可能会发现性能受到每个工作线程之间竞争堆锁的限制,而不是其他任何因素的影响。 - Chris Becke
不幸的是,我必须使用COM组件,而线程池与它们不太兼容。 - Coder
不了解您如何使用COM对象,这使得很难判断。如果COM对象是工作的一部分 - 是的 - 存在问题。如果COM对象用于执行任务,则每个工作线程都是其自己的公寓,您只需要一个TLS存储器来获取与当前线程相关的工作对象。 - Chris Becke

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接