仅使用临界区的Win32读/写锁

9
我需要在工作项目中使用Win32 API实现C++读/写锁。所有现有的解决方案都使用内核对象(信号量和互斥体),这些对象在执行期间需要进行上下文切换,速度太慢了。
如果可能的话,我想使用临界区来实现它。该锁不必是进程安全的,只需要是线程安全的。你有什么关于如何实现的想法吗?
10个回答

12

如果您的目标是Vista或更高版本,应该使用内置的SRWLock's。它们像临界区一样轻巧,在没有争用时完全处于用户模式。

Joe Duffy的博客最近有一些关于实现不同类型的非阻塞读写锁的文章。这些锁会自旋,因此如果您打算在持有锁时执行大量工作,则不适合使用。代码是C#编写的,但是应该很容易转换为本地代码。

您可以使用临界区和事件来实现读写锁 - 您只需要保持足够的状态以在必要时发出事件信号,以避免不必要的内核模式调用。


6

虽然这是一个老问题,但以下内容应该有效。它不会在争用时自旋。如果读者几乎没有争用,则产生的额外成本有限,因为SetEvent被懒惰地调用(查看编辑历史以获取没有此优化的更重量级版本)。

#include <windows.h>

typedef struct _RW_LOCK {
    CRITICAL_SECTION countsLock;
    CRITICAL_SECTION writerLock;
    HANDLE noReaders;
    int readerCount;
    BOOL waitingWriter;
} RW_LOCK, *PRW_LOCK;

void rwlock_init(PRW_LOCK rwlock)
{
    InitializeCriticalSection(&rwlock->writerLock);
    InitializeCriticalSection(&rwlock->countsLock);

    /*
     * Could use a semaphore as well.  There can only be one waiter ever,
     * so I'm showing an auto-reset event here.
     */
    rwlock->noReaders = CreateEvent (NULL, FALSE, FALSE, NULL);
}

void rwlock_rdlock(PRW_LOCK rwlock)
{
    /*
     * We need to lock the writerLock too, otherwise a writer could
     * do the whole of rwlock_wrlock after the readerCount changed
     * from 0 to 1, but before the event was reset.
     */
    EnterCriticalSection(&rwlock->writerLock);
    EnterCriticalSection(&rwlock->countsLock);
    ++rwlock->readerCount;
    LeaveCriticalSection(&rwlock->countsLock);
    LeaveCriticalSection(&rwlock->writerLock);
}

int rwlock_wrlock(PRW_LOCK rwlock)
{
    EnterCriticalSection(&rwlock->writerLock);
    /*
     * readerCount cannot become non-zero within the writerLock CS,
     * but it can become zero...
     */
    if (rwlock->readerCount > 0) {
        EnterCriticalSection(&rwlock->countsLock);

        /* ... so test it again.  */
        if (rwlock->readerCount > 0) {
            rwlock->waitingWriter = TRUE;
            LeaveCriticalSection(&rwlock->countsLock);
            WaitForSingleObject(rwlock->noReaders, INFINITE);
        } else {
            /* How lucky, no need to wait.  */
            LeaveCriticalSection(&rwlock->countsLock);
        }
    }

    /* writerLock remains locked.  */
}

void rwlock_rdunlock(PRW_LOCK rwlock)
{
    EnterCriticalSection(&rwlock->countsLock);
    assert (rwlock->readerCount > 0);
    if (--rwlock->readerCount == 0) {
        if (rwlock->waitingWriter) {
            /*
             * Clear waitingWriter here to avoid taking countsLock
             * again in wrlock.
             */
            rwlock->waitingWriter = FALSE;
            SetEvent(rwlock->noReaders);
        }
    }
    LeaveCriticalSection(&rwlock->countsLock);
}

void rwlock_wrunlock(PRW_LOCK rwlock)
{
    LeaveCriticalSection(&rwlock->writerLock);
}

您可以通过使用一个单一的CRITICAL_SECTION来降低读者的成本:
  • countsLock is replaced with writerLock in rdlock and rdunlock

  • rwlock->waitingWriter = FALSE is removed in wrunlock

  • wrlock's body is changed to

    EnterCriticalSection(&rwlock->writerLock);
    rwlock->waitingWriter = TRUE;
    while (rwlock->readerCount > 0) {
        LeaveCriticalSection(&rwlock->writerLock);
        WaitForSingleObject(rwlock->noReaders, INFINITE);
        EnterCriticalSection(&rwlock->writerLock);
    }
    rwlock->waitingWriter = FALSE;
    
    /* writerLock remains locked.  */
    
然而这种方式不够公平,因此我更喜欢上述解决方案。

@KindDragon,我已经包含了一个新版本的算法,它对读取器来说更加便宜。 - Paolo Bonzini

6
我认为至少需要使用一个内核级对象(Mutex或Semaphore)才能完成此操作,因为您需要内核的帮助来使调用进程阻塞,直到锁可用。
临界区确实提供了阻塞功能,但API过于有限。例如,您无法获取关键部分,发现读取锁可用但写入锁不可用,并等待其他进程完成读取(因为如果其他进程拥有关键部分,则会阻止其他读者,这是错误的;如果没有,则您的进程将不会阻塞,而是旋转,消耗CPU周期)。
但是,您可以使用自旋锁,并在有争用时回退到互斥锁。关键部分本身就是这样实现的。我会使用现有的关键部分实现,并将PID字段替换为单独的读取器和编写器计数。

3

3

2
好的链接 - 该文章指出Vista具有内置的读写锁http://msdn.microsoft.com/en-us/library/aa904937(VS.85).aspx - sean e

1

这是一个老问题,但也许有人会发现这很有用。我们开发了一个高性能的开源RWLock for Windows,它自动使用Vista+ SRWLockMichael提到,如果可用,否则回退到用户空间实现。

作为额外的奖励,它有四种不同的“口味”(虽然你可以坚持基本的,这也是最快的),每个都提供更多的同步选项。它从基本的RWLock()开始,它是非可重入的,仅限于单进程同步,并且不能交换读/写锁定,到完全跨进程IPC RWLock,支持可重入和读/写降级。

如上所述,它们在可能时动态切换到Vista+轻量级读写锁,以获得最佳性能,但您根本不必担心,因为它将回退到Windows XP及其类似系统上的完全兼容实现。


0

我使用了临界区来编写以下代码。

class ReadWriteLock {
    volatile LONG writelockcount;
    volatile LONG readlockcount;
    CRITICAL_SECTION cs;
public:
    ReadWriteLock() {
        InitializeCriticalSection(&cs);
        writelockcount = 0;
        readlockcount = 0;
    }
    ~ReadWriteLock() {
        DeleteCriticalSection(&cs);
    }
    void AcquireReaderLock() {        
    retry:
        while (writelockcount) {
            Sleep(0);
        }
        EnterCriticalSection(&cs);
        if (!writelockcount) {
            readlockcount++;
        }
        else {
            LeaveCriticalSection(&cs);
            goto retry;
        }
        LeaveCriticalSection(&cs);
    }
    void ReleaseReaderLock() {
        EnterCriticalSection(&cs);
        readlockcount--;
        LeaveCriticalSection(&cs);
    }
    void AcquireWriterLock() {
        retry:
        while (writelockcount||readlockcount) {
            Sleep(0);
        }
        EnterCriticalSection(&cs);
        if (!writelockcount&&!readlockcount) {
            writelockcount++;
        }
        else {
            LeaveCriticalSection(&cs);
            goto retry;
        }
        LeaveCriticalSection(&cs);
    }
    void ReleaseWriterLock() {
        EnterCriticalSection(&cs);
        writelockcount--;
        LeaveCriticalSection(&cs);
    }
};

要执行自旋等待,请注释掉 Sleep(0) 的行。


0

这是我能想到的最小解决方案:

http://www.baboonz.org/rwlock.php

并原样粘贴:

/** A simple Reader/Writer Lock.

This RWL has no events - we rely solely on spinlocks and sleep() to yield control to other threads.
I don't know what the exact penalty is for using sleep vs events, but at least when there is no contention, we are basically
as fast as a critical section. This code is written for Windows, but it should be trivial to find the appropriate
equivalents on another OS.

**/
class TinyReaderWriterLock
{
public:
    volatile uint32 Main;
    static const uint32 WriteDesireBit = 0x80000000;

    void Noop( uint32 tick )
    {
        if ( ((tick + 1) & 0xfff) == 0 )     // Sleep after 4k cycles. Crude, but usually better than spinning indefinitely.
            Sleep(0);
    }

    TinyReaderWriterLock()                 { Main = 0; }
    ~TinyReaderWriterLock()                { ASSERT( Main == 0 ); }

    void EnterRead()
    {
        for ( uint32 tick = 0 ;; tick++ )
        {
            uint32 oldVal = Main;
            if ( (oldVal & WriteDesireBit) == 0 )
            {
                if ( InterlockedCompareExchange( (LONG*) &Main, oldVal + 1, oldVal ) == oldVal )
                    break;
            }
            Noop(tick);
        }
    }

    void EnterWrite()
    {
        for ( uint32 tick = 0 ;; tick++ )
        {
            if ( (tick & 0xfff) == 0 )                                     // Set the write-desire bit every 4k cycles (including cycle 0).
                _InterlockedOr( (LONG*) &Main, WriteDesireBit );

            uint32 oldVal = Main;
            if ( oldVal == WriteDesireBit )
            {
                if ( InterlockedCompareExchange( (LONG*) &Main, -1, WriteDesireBit ) == WriteDesireBit )
                    break;
            }
            Noop(tick);
        }
    }

    void LeaveRead()
    {
        ASSERT( Main != -1 );
        InterlockedDecrement( (LONG*) &Main );
    }
    void LeaveWrite()
    {
        ASSERT( Main == -1 );
        InterlockedIncrement( (LONG*) &Main );
    }
};

0

如果您已经知道一种仅使用互斥量的解决方案,那么您应该能够修改它以使用临界区。

我们使用了两个临界区和一些计数器来自己实现。它适合我们的需求 - 我们有非常少的写入者数量,写入者优先于读取者等等。我不能公开我们的代码,但可以说不使用互斥量和信号量也是可能的。


2
问题是使用标准互斥量,任何人都可以锁定/解锁它。而对于临界区域来说则不是这样的,这使得我的解决方案无法奏效。 - Dan Lorenc
只有拥有互斥锁的线程才能解锁它,这句话不够清晰。一旦释放了互斥锁,任何人都可以锁定它。关键区域同理。 - sean e
2
抱歉,我指的是信号量。我需要一个任何人都可以减少信号量的解决方案,这在临界区中是不可能的。 - Dan Lorenc
根据信号量的用途,您可能可以使用通过InterlockedIncrement/InterlockedDecrement修改的计数器。 - sean e

-1

请查看我的实现:

https://github.com/coolsoftware/LockLib

VRWLock是一个实现单写多读逻辑的C++类。

还可以查看测试项目TestLock.sln。

更新。下面是读者和写者的简单代码:

LONG gCounter = 0;

// reader

for (;;) //loop
{
  LONG n = InterlockedIncrement(&gCounter); 
  // n = value of gCounter after increment
  if (n <= MAX_READERS) break; // writer does not write anything - we can read
  InterlockedDecrement(&gCounter);
}
// read data here
InterlockedDecrement(&gCounter); // release reader

// writer

for (;;) //loop
{
  LONG n = InterlockedCompareExchange(&gCounter, (MAX_READERS+1), 0); 
  // n = value of gCounter before attempt to replace it by MAX_READERS+1 in InterlockedCompareExchange
  // if gCounter was 0 - no readers/writers and in gCounter will be MAX_READERS+1
  // if gCounter was not 0 - gCounter stays unchanged
  if (n == 0) break;
}
// write data here
InterlockedExchangeAdd(&gCounter, -(MAX_READERS+1)); // release writer

VRWLock类支持自旋次数和线程特定引用计数,允许释放已终止线程的锁。


请不要仅仅抛出链接。请包含相关代码并解释它如何回答问题。 - Esoteric Screen Name

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接