ReaderWriterLockSlim and async\await

54

我对 ReaderWriterLockSlim 有一些问题。我无法理解它的工作原理。

我的代码:

 private async Task LoadIndex()
    {
        if (!File.Exists(FileName + ".index.txt"))
        {
            return;
        }
        _indexLock.EnterWriteLock();// <1>
        _index.Clear();
        using (TextReader index = File.OpenText(FileName + ".index.txt"))
        {
            string s;
            while (null != (s = await index.ReadLineAsync()))
            {
                var ss = s.Split(':');
                _index.Add(ss[0], Convert.ToInt64(ss[1]));
            }
        }
        _indexLock.ExitWriteLock();<2>
    }

当我在<1>处进入写锁时,在调试器中我可以看到_indexLock.IsWriteLockHeldtrue,但是当执行步骤到<2>时,我发现_indexLock.IsWriteLockHeld变成了false,并且_indexLock.ExitWriteLock抛出了一个名为"SynchronizationLockException"的异常,其消息为"The write lock is being released without being held"。 我做错了什么?


“_indexLock” 是如何初始化的?在一个线程正在执行“LoadIndex()”时,另一个线程是否可能同时初始化它? - Matthew Watson
可能是另一个有访问_indexLock的线程正在重新初始化它...肯定不可能被另一个线程释放,但也许重新初始化为一个全新的实例... - Sadek Noureddine
不需要线程来覆盖 _indexLock。 - Hans Passant
_indexLock 在类实例创建时被初始化并声明为:private readonly ReaderWriterLockSlim _indexLock = new ReaderWriterLockSlim()。没有任何一个线程可以修改它。 - l0nley
6个回答

88

ReaderWriterLockSlim 是一种线程亲和锁类型,因此通常不能与 asyncawait 一起使用。

你应该使用带有 WaitAsyncSemaphoreSlim,或者(如果你真的需要读写锁),可以使用 AsyncEx 中的我的 AsyncReaderWriterLock 或者 Stephen Toub 的 AsyncReaderWriterLock


1
也可以说这取决于平台,不是吗?OP发布的代码没有说明它是WPF还是控制台应用程序。在控制台应用程序中,它不起作用,因为那里没有“SynchronizationContext”。该代码在WPF应用程序中正常工作。https://dev59.com/ymUo5IYBdhLWcg3wpgsf?noredirect=1&lq=1 - Don Box
@DonBox:不行;正如我在回答那个问题时所指出的,即使您在同一线程上恢复,仍然允许任意代码在您“持有”该锁的同时运行。 - Stephen Cleary
2
@DonBox:你在评论中问了很多问题。我认为你会受益于提出自己的问题。 - Stephen Cleary
1
好的,明白了,这是链接:http://stackoverflow.com/questions/44036160/readerwriterlockslim-questions - Don Box
1
@kuldeep: 如果没有真实的指标可以说明,我肯定会使用 SemaphoreSlim - Stephen Cleary
显示剩余7条评论

6
你可以使用可靠且轻量级的SemaphoreSlim模拟读者/写者锁定机制,并保持async/await的好处。创建一个SemaphoreSlim,使其具有与同时为读资源锁定的例程数相同的可用锁数。每个锁都将像往常一样请求一个锁。对于你的写例程,请确保在进行操作之前请求所有可用的锁。
这样,你的写例程将始终独自运行,而你的读例程可能仅在它们之间共享资源。
例如,假设你有2个读例程和1个写例程。
SemaphoreSlim semaphore = new SemaphoreSlim(2);

async void Reader1()
{
    await semaphore.WaitAsync();
    try
    {
        // ... reading stuff ...
    }
    finally
    {
        semaphore.Release();
    }
}

async void Reader2()
{
    await semaphore.WaitAsync();
    try
    {
        // ... reading other stuff ...
    }
    finally
    {
        semaphore.Release();
    }
}

async void ExclusiveWriter()
{
    // the exclusive writer must request all locks
    // to make sure the readers don't have any of them
    // (I wish we could specify the number of locks
    // instead of spamming multiple calls!)
    await semaphore.WaitAsync();
    await semaphore.WaitAsync();
    try
    {
        // ... writing stuff ...
    }
    finally
    {
        // release all locks here
        semaphore.Release(2);
        // (oh here we don't need multiple calls, how about that)
    }
}

显然,仅当您预先知道同时运行多少读取例程时,此方法才有效。诚然,太多的读取例程将使这段代码非常丑陋。


4
这并没有提供读/写语义。读写锁的概念是允许任意数量的并发读取操作,但如果有写入操作正在进行,则不允许任何读取器或其他写入器。这既不必要地限制了读取器,也不能在一次写入操作时正确地限制读取器或其他写入器。 - Servy
2
当您有活跃的读者时,我会阻止任何作者;而在没有任何活跃的作者时,仍会允许并发读取。这本质上就是读者/写者锁应该做的事情,除非您不知道会有多少读者,正如我明确指出的那样(即使这也可以解决,但我不想为您破坏惊喜)。这也可能不符合您的标准,但它是一个有效的解决方案,我自己在经过充分测试、高需求的服务中使用了它。我追求简单,但各有所好... - mycelo
1
这个快速解决方案并不依赖外部依赖,还算不错。如果需要重复使用,应该将其封装成自己的类(为每次读取调用多个.WaitAsync(),并记住您在每次使用时考虑了多少读者,并在必要时跨每次使用更改它,这将很丑陋且容易出错)。 - Marc L.
16
几个月后再看这个问题:如果两个线程同时调用ExclusiveWriter会不会出现死锁的可能性?如果两者都成功通过第一个WaitAsync()调用,那么调用计数将增加到两个,并且两个线程都将等待Release() - Marc L.
2
@MarcL。我已经将独占写入方法封装在它自己的 writeLock 信号量中。只允许一个线程进入。 - CodeMonkey

5

前段时间我为我的项目实现了一个基于两个SemaphoreSlim的AsyncReaderWriterLock类。希望它能有所帮助。它实现了相同的逻辑(多读单写),同时支持异步/等待模式。但它不支持递归,并且没有保护不正确使用的措施:

var rwLock = new AsyncReaderWriterLock();

await rwLock.AcquireReaderLock();
try
{
    // ... reading ...
}
finally
{
    rwLock.ReleaseReaderLock();
}

await rwLock.AcquireWriterLock();
try
{
    // ... writing ...
}
finally
{
    rwLock.ReleaseWriterLock();
}



public sealed class AsyncReaderWriterLock : IDisposable
{
    private readonly SemaphoreSlim _readSemaphore  = new SemaphoreSlim(1, 1);
    private readonly SemaphoreSlim _writeSemaphore = new SemaphoreSlim(1, 1);
    private          int           _readerCount;

    public async Task AcquireWriterLock(CancellationToken token = default)
    {
        await _writeSemaphore.WaitAsync(token).ConfigureAwait(false);
        await SafeAcquireReadSemaphore(token).ConfigureAwait(false);
    }

    public void ReleaseWriterLock()
    {
        _readSemaphore.Release();
        _writeSemaphore.Release();
    }

    public async Task AcquireReaderLock(CancellationToken token = default)
    {
        await _writeSemaphore.WaitAsync(token).ConfigureAwait(false);

        if (Interlocked.Increment(ref _readerCount) == 1)
        {
            try
            {
                await SafeAcquireReadSemaphore(token).ConfigureAwait(false);
            }
            catch
            {
                Interlocked.Decrement(ref _readerCount);

                throw;
            }
        }

        _writeSemaphore.Release();
    }

    public void ReleaseReaderLock()
    {
        if (Interlocked.Decrement(ref _readerCount) == 0)
        {
            _readSemaphore.Release();
        }
    }

    private async Task SafeAcquireReadSemaphore(CancellationToken token)
    {
        try
        {
            await _readSemaphore.WaitAsync(token).ConfigureAwait(false);
        }
        catch
        {
            _writeSemaphore.Release();

            throw;
        }
    }

    public void Dispose()
    {
        _writeSemaphore.Dispose();
        _readSemaphore.Dispose();
    }
}

2
@TheodorZoulias,你是对的,AcquireReaderLock首先获取_writeSemaphore以确保此时没有任何写入者。一旦获取_readerLock,写入信号量立即释放。如果您注意到,读/写获取方法会等待两个信号量。因此,SafeAcquireReadSemaphore()用于在第二个WaitAsync()中发生取消的情况下,以便在出现OCE的情况下正确释放资源。 - xtadex
1
@xtadex 我注意到的一件事是,在 AcquireReaderLock 内部,增量后发生异常会使 _readerCount 保持与之前相同。至少在使用答案中的模式时,调用者不会调用 release,因此总会有一个被中止的读取器。它可能发生的一种方式是令牌被取消。 - eglasius
1
很好的发现 @eglasius。我再次检查了代码,几乎达到了100%(除了一些棘手的竞态条件)_readSemaphore状态是自由的,并且等待从未在那里发生过。但是我已经添加了try / catch来处理这种情况。谢谢。 - xtadex
2
@xtadex 没有找到其他内容,这是我使用的版本,稍微重构了一下,并加上了一些注释 https://github.com/copenhagenatomics/CA_DataUploader/pull/90/files#diff-24a9664c904fe9276878f37dc1438aae578a76b7ef34eabbebf6ac66eaad83e6.. 谢谢! - eglasius
2
这个版本非常适合我创建的单元测试进行验证 - 而最近的(看起来更简单)版本却因死锁而未通过同样的测试。但是我真的喜欢 IDisposable 提供的 using(){} 符号表示法,所以我改编了 @eglasius 的版本,在这里支持它:https://gist.github.com/cajuncoding/a88f0d00847dcfc241ae80d1c7bafb1e - CajunCoding
显示剩余7条评论

3

https://learn.microsoft.com/en-us/dotnet/api/system.threading.readerwriterlockslim?view=net-5.0

来自源代码:

ReaderWriterLockSlim具有托管线程亲和性,即每个Thread对象必须自己进行方法调用以进入和退出锁模式。没有线程可以更改另一个线程的模式。

所以这里有一个预期的行为。 Async/await不能保证在同一线程中继续执行,所以当您尝试在一个线程中进入写锁并在另一个线程中尝试退出时,可能会捕获异常。

最好使用其他答案中提到的其他锁机制,例如SemaphoreSlim


0
就像Stephen Cleary所说,ReaderWriterLockSlim是一种线程相关的锁类型,因此通常不能与async和await一起使用。

你需要构建一个机制,以避免读者和写者同时访问共享数据。这个算法应该遵循一些规则。

请求读取锁时:

  • 是否有写入锁处于活动状态?
  • 是否已经排队了任何内容?(我认为按照请求顺序执行很好)

请求写入锁时:

  • 是否有写入锁处于活动状态?(因为写入锁不应该并行执行)
  • 是否有任何读取锁处于活动状态?
  • 是否已经排队了任何内容?

如果这些条件中的任何一个回答是,则执行应该被排队并稍后执行。您可以使用TaskCompletionSources来继续awaits

当任何读者或写者完成时,您应该评估队列并在可能时继续执行项目。


例如(nuget): AsyncReaderWriterLock

0

我受到了 @xtadex 答案的启发。不过,在我的看法中,一个 SemaphoreSlim 实例足以实现相同的行为:

internal sealed class ReadWriteSemaphore : IDisposable
{
    private readonly SemaphoreSlim _semaphore;
    private int _count;

    public ReadWriteSemaphore()
    {
        _semaphore = new SemaphoreSlim(2, 2);
        _count = 0;
    }

    public void Dispose() => _semaphore.Dispose();

    public async Task<IDisposable> WaitReadAsync(CancellationToken cancellation = default)
    {
        await _semaphore.WaitAsync(cancellation);
        if (Interlocked.Increment(ref _count) > 1)
            _semaphore.Release();
        return new Token(() =>
        {
            if (Interlocked.Decrement(ref _count) == 0)
                _semaphore.Release();
        });
    }

    public async Task<IDisposable> WaitWriteAsync(CancellationToken cancellation = default)
    {
        int count = 0;
        try
        {
            while (await _semaphore.WaitAsync(Timeout.Infinite, cancellation) && ++count < 2)
                continue;
        }
        catch (OperationCanceledException) when (count > 0)
        {
            _semaphore.Release(count);
            throw;
        }
        return new Token(() => _semaphore.Release(count));
    }

    private sealed class Token : IDisposable
    {
        private readonly Action _action;
        public Token(Action action)
        {
            _action = action;
        }
        public void Dispose() => _action.Invoke();
    }
}

它允许您使用using同步访问资源:

using (await _semaphore.WaitReadAsync()) // _semaphore is an instance of ReadWriteSemaphore
{
    // read something
}

关于编写:

using (await _semaphore.WaitWriteAsync()) // _semaphore is an instance of ReadWriteSemaphore
{
    // write something
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接