使用本地Windows API实现的Win32线程安全队列实现

3
由于Windows中缺少条件变量(尽管自Vista以来引入,但不支持Windows XP和2003),因此在C++中实现线程安全队列并不是很容易。可以参考在Win32上实现POSIX条件变量的策略。我所需要的仅是使用CriticalSection或Mutex和Event而不是semaphore和条件变量。
我也试图找到一个只使用Win32本地API的确切实现,但没有成功。因此,我自己完成了一个。问题是,我不能确定这段代码是否是线程安全的。谁能告诉我它是否可行?
class CEventSyncQueue
{
public:
    CEventSyncQueue(int nCapacity = -1);
    virtual ~CEventSyncQueue();
    virtual void Put(void* ptr);
    virtual void* Get();
protected:
    int m_nCapacity;
    CPtrList m_list;

    CRITICAL_SECTION m_lock;    
    HANDLE m_hGetEvent;
    HANDLE m_hPutEvent;
};

CEventSyncQueue::CEventSyncQueue(int nCapacity)
{
    m_nCapacity = nCapacity;

    ::InitializeCriticalSection(&m_lock);
    m_hPutEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
    m_hGetEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
}

CEventSyncQueue::~CEventSyncQueue()
{
    m_list.RemoveAll();

    ::CloseHandle(m_hGetEvent);
    ::CloseHandle(m_hPutEvent);

    ::DeleteCriticalSection(&m_lock);
}

void CEventSyncQueue::Put(void* ptr)
{
    ::EnterCriticalSection(&m_lock);

    while(m_nCapacity > 0 && m_list.GetCount() >= m_nCapacity)
    {
        ::LeaveCriticalSection(&m_lock);

        //wait
        if(::WaitForSingleObject(m_hPutEvent, INFINITE) != WAIT_OBJECT_0)
        {
            ASSERT(FALSE);
        }

        ::EnterCriticalSection(&m_lock);
    }
    if(m_nCapacity > 0)
    {
        ASSERT(m_list.GetCount() < m_nCapacity);
    }
    m_list.AddTail(ptr);

    ::SetEvent(m_hGetEvent);    //notifyAll
    ::LeaveCriticalSection(&m_lock);
}
void* CEventSyncQueue::Get()
{
    ::EnterCriticalSection(&m_lock);

    while(m_list.IsEmpty())
    {
        ::LeaveCriticalSection(&m_lock);

        //wait
        if(::WaitForSingleObject(m_hGetEvent, INFINITE) != WAIT_OBJECT_0)
        {
            ASSERT(FALSE);
        }

        ::EnterCriticalSection(&m_lock);
    }
    ASSERT(!m_list.IsEmpty());
    void* ptr = m_list.RemoveHead();

    ::SetEvent(m_hPutEvent);    //notifyAll
    ::LeaveCriticalSection(&m_lock);

    return ptr;
}

我注意到PostThreadMessage是其中一个允许的函数。假设您的waveOutProc函数只是将项目添加到队列中,而不是删除它们,那么这可能是一种更简单的方法。 - Harry Johnston
是的,PostThreadMessage是允许的。但这将需要一个新线程来接收和处理消息,这会使事情变得更加复杂。我在这里提出这个问题的原因是我想澄清事件可以做什么,不能做什么。如何解决waveOutProc中的问题并不是重点。 - gelu
@gelu - 如果你的基于事件的P-C队列(如果它能工作)需要一个新线程来接收和处理来自队列的消息。PostMessage()/PostThreadMessage()通信系统需要一个while循环和GetMessage() - 有多简单?“如何解决waveOutProc中的问题并不是重点” - 你的帖子现在有点循环了。再次强调 - 事件不是构建生产者-消费者队列的适当同步原语(除了某些奇特的情况,其中必须选择要运行的特定消费者线程)。 - Martin James
是的,我同意你的观点。事件并不是实现生产者-消费者队列的适当方式。我现在确信我贴出的代码是错误的。当有超过2个P线程或2个C线程时,它会导致死锁。似乎只使用事件来实现P-C队列是困难的(也许是不可能的)。 - gelu
哦,我相信只用事件和临界区就可以做到这一点,但很难做到正确 - 而且更难确定你已经做对了。一种方法是使用它们来实现自己的信号量。 - Harry Johnston
@HarryJohnston, 你能分享一个正确的例子给我吗? - gelu
3个回答

2

在Windows中实现线程安全队列是非常容易的。我已经在Delphi、C++、BCB等语言中完成了它。

你为什么认为需要条件变量?你认为Windows消息队列是如何工作的?

对于P-C队列来说,事件是错误的原语。最简单/清晰的方法是使用信号量。

简单的无限制生产者-消费者队列。

template <typename T> class PCSqueue{
    CRITICAL_SECTION access;
    deque<T> *objectQueue;
    HANDLE queueSema;
public:
    PCSqueue(){
        objectQueue=new deque<T>;
        InitializeCriticalSection(&access);
        queueSema=CreateSemaphore(NULL,0,MAXINT,NULL);
    };
    void push(T ref){
        EnterCriticalSection(&access);
        objectQueue->push_front(ref);
        LeaveCriticalSection(&access);
        ReleaseSemaphore(queueSema,1,NULL);
    };
    bool pop(T *ref,DWORD timeout){
        if (WAIT_OBJECT_0==WaitForSingleObject(queueSema,timeout)) {
            EnterCriticalSection(&access);
            *ref=objectQueue->back();
            objectQueue->pop_back();
            LeaveCriticalSection(&access);
            return(true);
        }
        else
            return(false);
    };
};

编辑 - 一个有界队列不会更加困难 - 您需要另一个信号量来计算空间。我不使用有界队列,但我相信它是可以的 - 带有2个信号量和一个互斥/临界区的有界队列是一种标准模式。

编辑:使用PostMessage()或PostThreadMessage() API调用 - 它们明确声明在'waveOutProc'回调中是安全的。MSDN表示调用'其他波形函数'将导致死锁 - 信号量调用不在该集合中,如果SetEvent()被允许而ReleaseSemaphore()不被允许,我会非常惊讶。事实上,我会惊讶地发现在Windows中任何地方都不允许SetEvent()而不允许ReleaseSemaphore()。


3
为什么你不能使用信号量? - Harry Johnston
除了@HarryJohnston的评论外,我对使用事件的有界队列在多个生产者/消费者情况下100%安全没有太大信心。 我必须进行测试,但我没有动力这样做,因为使用信号量更简单且肯定有效。 自W95以来,Windows就具有本机API信号量支持。 在我的示例中,“CreateSemaphore()”等调用很容易在MSDN上找到。 - Martin James
信号量是这种情况下理想且简单的方式。我在想事件能否替代信号量。是或不是,这让我头疼。 - gelu
楼上说得很对,只有一小部分特定的Win32函数可以在waveOutProc内使用。 - Harry Johnston
啊啊啊!!!在允许调用列表中有'PostMessage,PostThreadMessage' - 生产者-消费者队列调用!!!我并不相信信号量调用无论如何都不起作用,无论MSDN对于这个回调说什么。信号量通常用于与驱动程序进行通信 - 在许多操作系统中,它们几乎是从驱动程序中安全调用的唯一方法! - Martin James
显示剩余7条评论

0

仔细想想,其实没有必要显式地实现一个信号量。相反,只需考虑如何使用事件来实现信号量,并以此方式解决问题。我的第一次尝试使用了手动重置事件,这是低效但明显正确的,然后我进行了优化。

请注意,我还没有调试(甚至没有编译!)这两个代码片段,但它们应该能给你正确的思路。这是手动重置版本:

class CEventSyncQueue
{
public:
    CEventSyncQueue(int nCapacity = -1);
    virtual ~CEventSyncQueue();
    virtual void Put(void* ptr);
    virtual void* Get();
protected:
    int m_nCapacity;
    CPtrList m_list;

    CRITICAL_SECTION m_lock;    
    HANDLE m_queue_not_empty;
    HANDLE m_queue_not_full;
};

CEventSyncQueue::CEventSyncQueue(int nCapacity)
{
    m_nCapacity = nCapacity;
    ::InitializeCriticalSection(&m_lock);
    m_queue_not_empty = ::CreateEvent(NULL, TRUE, FALSE, NULL);
    m_queue_not_full = ::CreateEvent(NULL, TRUE, TRUE, NULL);
}

CEventSyncQueue::~CEventSyncQueue()
{
    m_list.RemoveAll();
    ::CloseHandle(m_queue_not_empty);
    ::CloseHandle(m_queue_not_full);
    ::DeleteCriticalSection(&m_lock);
}

void CEventSyncQueue::Put(void* ptr)
{
    bool done = false;
    while (!done)
    {
        // If the queue is full, we must wait until it isn't.
        if(::WaitForSingleObject(m_queue_not_full, INFINITE) != WAIT_OBJECT_0)
        {
            ASSERT(FALSE);
        }

        // However, we might not be the first to respond to the event,
        // so we still need to check whether the queue is full and loop
        // if it is.
        ::EnterCriticalSection(&m_lock);
        if (m_nCapacity <= 0 || m_list.GetCount() < m_nCapacity)
        {
            m_list.AddTail(ptr);
            done = true;
            // The queue is definitely not empty.
            SetEvent(m_queue_not_empty);
            // Check whether the queue is now full.
            if (m_nCapacity > 0 && m_list.GetCount() >= m_nCapacity)
            {
                ResetEvent(m_queue_not_full);
            }
        }
        ::LeaveCriticalSection(&m_lock);
    }
}

void* CEventSyncQueue::Get()
{
    void *result = nullptr;
    while (result == nullptr)
    {
        // If the queue is empty, we must wait until it isn't.
        if(::WaitForSingleObject(m_queue_not_empty, INFINITE) != WAIT_OBJECT_0)
        {
            ASSERT(FALSE);
        }

        // However, we might not be the first to respond to the event,
        // so we still need to check whether the queue is empty and loop
        // if it is.
        ::EnterCriticalSection(&m_lock);
        if (!m_list.IsEmpty())
        {
            result = m_list.RemoveHead();
            ASSERT(result != nullptr);
            // The queue shouldn't be full at this point!
            ASSERT(m_nCapacity <= 0 || m_list.GetCount() < m_nCapacity);
            SetEvent(m_queue_not_full);
            // Check whether the queue is now empty.
            if (m_list.IsEmpty())
            {
                ResetEvent(m_queue_not_empty);
            }
        }
        ::LeaveCriticalSection(&m_lock);
    }
}

这里是更高效的、自动重置事件版本:

class CEventSyncQueue
{
public:
    CEventSyncQueue(int nCapacity = -1);
    virtual ~CEventSyncQueue();
    virtual void Put(void* ptr);
    virtual void* Get();
protected:
    int m_nCapacity;
    CPtrList m_list;

    CRITICAL_SECTION m_lock;    
    HANDLE m_queue_not_empty;
    HANDLE m_queue_not_full;
};

CEventSyncQueue::CEventSyncQueue(int nCapacity)
{
    m_nCapacity = nCapacity;
    ::InitializeCriticalSection(&m_lock);
    m_queue_not_empty = ::CreateEvent(NULL, FALSE, FALSE, NULL);
    m_queue_not_full = ::CreateEvent(NULL, FALSE, TRUE, NULL);
}

CEventSyncQueue::~CEventSyncQueue()
{
    m_list.RemoveAll();
    ::CloseHandle(m_queue_not_empty);
    ::CloseHandle(m_queue_not_full);
    ::DeleteCriticalSection(&m_lock);
}

void CEventSyncQueue::Put(void* ptr)
{
    if (m_nCapacity <= 0)
    {
        ::EnterCriticalSection(&m_lock);
        m_list.AddTail(ptr);
        SetEvent(m_queue_not_empty);
        ::LeaveCriticalSection(&m_lock);
        return;
    }

    bool done = false;
    while (!done)
    {
        // If the queue is full, we must wait until it isn't.
        if(::WaitForSingleObject(m_queue_not_full, INFINITE) != WAIT_OBJECT_0)
        {
            ASSERT(FALSE);
        }

        // However, under some (rare) conditions we'll get here and find
        // the queue is already full again, so be prepared to loop.
        ::EnterCriticalSection(&m_lock);
        if (m_list.GetCount() < m_nCapacity)
        {
            m_list.AddTail(ptr);
            done = true;
            SetEvent(m_queue_not_empty);
            if (m_list.GetCount() < m_nCapacity)
            {
                SetEvent(m_queue_not_full);
            }
        }
        ::LeaveCriticalSection(&m_lock);
    }
}

void* CEventSyncQueue::Get()
{
    void *result = nullptr;
    while (result == nullptr)
    {
        // If the queue is empty, we must wait until it isn't.
        if(::WaitForSingleObject(m_queue_not_empty, INFINITE) != WAIT_OBJECT_0)
        {
            ASSERT(FALSE);
        }

        // However, under some (rare) conditions we'll get here and find
        // the queue is already empty again, so be prepared to loop.
        ::EnterCriticalSection(&m_lock);
        if (!m_list.IsEmpty())
        {
            result = m_list.RemoveHead();
            ASSERT(result != nullptr);
            // The queue shouldn't be full at this point!
            if (m_nCapacity <= 0) ASSERT(m_list.GetCount() < m_nCapacity);
            SetEvent(m_queue_not_full);
            if (!m_list.IsEmpty())
            {
                SetEvent(m_queue_not_empty);
            }
        }
        ::LeaveCriticalSection(&m_lock);
    }
}

太好了!谢谢Harry。我读了你的代码好几遍,没有发现任何死锁问题。你向我展示了非常好的事件使用方法。太棒了! - gelu

0

条件变量?你是指Interlocked*函数吗?这些函数已经存在很长时间了 - 我在Windows 2000中使用过它们。你可以使用它们来构建并发系统,但仍需要自己做一些工作。

或者,尝试使用OpenMP。要使用它,您需要Visual Studio 2008或更高版本。


条件(简称“条件变量”)是一种同步设备,允许线程暂停执行并放弃处理器,直到共享数据上的某个谓词得到满足。条件的基本操作包括:在条件满足时发出信号(当谓词变为真时),以及等待条件,暂停线程执行,直到另一个线程发出条件信号。 - gelu
条件变量 - gelu
请参考http://msdn.microsoft.com/en-us/library/windows/desktop/ms682052%28v=vs.85%29.aspx。 - Harry Johnston

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接