基于键的异步锁定

41

我正在尝试解决使用我的ImageProcessor库这里时出现的问题,当我将项目添加到缓存中时,会出现间歇性的文件访问错误。

System.IO.IOException: 由于另一个进程正在使用,因此无法访问文件“D:\home\site\wwwroot\app_data\cache\0\6\5\f\2\7\065f27fc2c8e843443d210a1e84d1ea28bbab6c4.webp”。

我编写了一个类,旨在根据哈希URL生成的密钥执行异步锁定,但似乎在实现中漏掉了一些东西。

我的锁定类

public sealed class AsyncDuplicateLock
{
    /// <summary>
    /// The collection of semaphore slims.
    /// </summary>
    private static readonly ConcurrentDictionary<object, SemaphoreSlim> SemaphoreSlims
                            = new ConcurrentDictionary<object, SemaphoreSlim>();

    /// <summary>
    /// Locks against the given key.
    /// </summary>
    /// <param name="key">
    /// The key that identifies the current object.
    /// </param>
    /// <returns>
    /// The disposable <see cref="Task"/>.
    /// </returns>
    public IDisposable Lock(object key)
    {
        DisposableScope releaser = new DisposableScope(
        key,
        s =>
        {
            SemaphoreSlim locker;
            if (SemaphoreSlims.TryRemove(s, out locker))
            {
                locker.Release();
                locker.Dispose();
            }
        });

        SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));
        semaphore.Wait();
        return releaser;
    }

    /// <summary>
    /// Asynchronously locks against the given key.
    /// </summary>
    /// <param name="key">
    /// The key that identifies the current object.
    /// </param>
    /// <returns>
    /// The disposable <see cref="Task"/>.
    /// </returns>
    public Task<IDisposable> LockAsync(object key)
    {
        DisposableScope releaser = new DisposableScope(
        key,
        s =>
        {
            SemaphoreSlim locker;
            if (SemaphoreSlims.TryRemove(s, out locker))
            {
                locker.Release();
                locker.Dispose();
            }
        });

        Task<IDisposable> releaserTask = Task.FromResult(releaser as IDisposable);
        SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));

        Task waitTask = semaphore.WaitAsync();

        return waitTask.IsCompleted
                   ? releaserTask
                   : waitTask.ContinueWith(
                       (_, r) => (IDisposable)r,
                       releaser,
                       CancellationToken.None,
                       TaskContinuationOptions.ExecuteSynchronously,
                       TaskScheduler.Default);
    }

    /// <summary>
    /// The disposable scope.
    /// </summary>
    private sealed class DisposableScope : IDisposable
    {
        /// <summary>
        /// The key
        /// </summary>
        private readonly object key;

        /// <summary>
        /// The close scope action.
        /// </summary>
        private readonly Action<object> closeScopeAction;

        /// <summary>
        /// Initializes a new instance of the <see cref="DisposableScope"/> class.
        /// </summary>
        /// <param name="key">
        /// The key.
        /// </param>
        /// <param name="closeScopeAction">
        /// The close scope action.
        /// </param>
        public DisposableScope(object key, Action<object> closeScopeAction)
        {
            this.key = key;
            this.closeScopeAction = closeScopeAction;
        }

        /// <summary>
        /// Disposes the scope.
        /// </summary>
        public void Dispose()
        {
            this.closeScopeAction(this.key);
        }
    }
}

使用 - 在 HttpModule 中

private readonly AsyncDuplicateLock locker = new AsyncDuplicateLock();

using (await this.locker.LockAsync(cachedPath))
{
    // Process and save a cached image.
}

有没有人能发现我的错误所在?我担心我对某些基本概念的理解出现了误差。

该库的完整源代码存储在Github 这里


这个库支持ResizeAsync或者一般的xxxAsync吗?我想异步使用你的库。 - Royi Namir
将方法包装在一个任务中。由于创建线程的成本很高,因此其中没有原生的异步方法。 - James South
詹姆斯,我正在谈论IO操作,例如保存到流中。 - Royi Namir
抱歉,我害怕仍然不行。底层代码 Image.Save(stream) 没有异步重载。 - James South
6个回答

75
另一位回答者所指出的,原始代码在释放信号量之前从ConcurrentDictionary中移除了SemaphoreSlim。因此,有过多的信号量翻转发生——它们被从字典中删除时仍可以使用(未获取,但已从字典中检索到)。
这种“映射锁”的问题在于难以知道何时不再需要该信号量。一个选择是根本不释放信号量;这是简单的解决方案,但在您的情况下可能不可接受。另一个选择——如果这些信号量实际上与对象实例相关而不是值(如字符串),则可以使用弱引用将它们附加。然而,我认为在您的情况下,这个选项也不可接受。
所以,我们用较困难的方法来解决。:)
有几种不同的方法可以解决这个问题。我认为从引用计数的角度考虑(对字典中的每个信号量进行引用计数)是有意义的。此外,我们希望使减少计数和删除操作成为原子操作,因此我只使用了一个lock(使并发字典变得多余)。
public sealed class AsyncDuplicateLock
{
  private sealed class RefCounted<T>
  {
    public RefCounted(T value)
    {
      RefCount = 1;
      Value = value;
    }

    public int RefCount { get; set; }
    public T Value { get; private set; }
  }

  private static readonly Dictionary<object, RefCounted<SemaphoreSlim>> SemaphoreSlims
                        = new Dictionary<object, RefCounted<SemaphoreSlim>>();

  private SemaphoreSlim GetOrCreate(object key)
  {
    RefCounted<SemaphoreSlim> item;
    lock (SemaphoreSlims)
    {
      if (SemaphoreSlims.TryGetValue(key, out item))
      {
        ++item.RefCount;
      }
      else
      {
        item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1));
        SemaphoreSlims[key] = item;
      }
    }
    return item.Value;
  }

  public IDisposable Lock(object key)
  {
    GetOrCreate(key).Wait();
    return new Releaser { Key = key };
  }

  public async Task<IDisposable> LockAsync(object key)
  {
    await GetOrCreate(key).WaitAsync().ConfigureAwait(false);
    return new Releaser { Key = key };
  }

  private sealed class Releaser : IDisposable
  {
    public object Key { get; set; }

    public void Dispose()
    {
      RefCounted<SemaphoreSlim> item;
      lock (SemaphoreSlims)
      {
        item = SemaphoreSlims[Key];
        --item.RefCount;
        if (item.RefCount == 0)
          SemaphoreSlims.Remove(Key);
      }
      item.Value.Release();
    }
  }
}

2
Ephemerons是一种动态语言概念,它将一个对象与另一个对象的生命周期联系起来。就像您可以添加到ExpandoObject的属性一样,但ephemerons可以附加到任何对象(在这方面更像JavaScript属性)。唯一的.NET ephemeron是ConditionalWeakTable,这是一个难以使用的对象。我编写了一个简单的包装库,名为ConnectedProperties - Stephen Cleary
3
太棒了!一个优雅的方法,如果我尝试过,我会一直过度设计。我已经实现了一个永远不会释放的实现,但是对于不断增加的内存使用量并不满意。非常感谢! - James South
2
@too:我不会这么做。被锁定的时间非常短,所以即使大多数使用是读取,改变它可能也不值得。 - Stephen Cleary
2
@kor_: 使用专用对象进行锁定从来不是一个坏主意。在这种特定情况下,我可以锁定字典实例,因为它是私有的且从未公开过。由于此处的代码是唯一可以访问字典实例的代码,因此我知道没有其他东西可以锁定它。 - Stephen Cleary
2
根据这个解决方案,我创建了一个可在NuGet https://www.nuget.org/packages/AsyncKeyedLock 和GitHub https://github.com/MarkCiliaVincenti/AsyncKeyedLock 上使用的.NET Standard 2.0库。 - Mark Cilia Vincenti
显示剩余21条评论

4

你的实现中存在的问题源于你想要从字典中删除未使用的信号量。如果每个 SemaphoreSlim 都可以一直留在字典中(直到进程终止),那么这将会简单得多。假设这不是可行的选项,你需要克服两个障碍:

  1. 如何跟踪每个信号量有多少工作线程正在使用,以便知道何时可以安全地将其删除。
  2. 如何使用高性能但棘手的 ConcurrentDictionary<K,V> 集合来完成上述操作。

Stephen Cleary的答案展示了如何使用普通的Dictionary<K,V>解决第一个问题。每个SemaphoreSlim都存储了一个引用计数器,并且所有内容都与单个锁对象上的lock语句同步。在这个答案中,我将展示如何解决第二个问题。

ConcurrentDictionary<K,V> 集合的问题在于它仅保护其内部状态而不是它包含的值,因此,如果您使用可变类作为 TValue,则会为 微妙的竞态条件 打开大门,特别是如果您打算将这些值缓存到池中并重用它们。消除竞态条件的技巧是使 TValue 成为不可变结构体。这样,它基本上成为字典的内部状态的一部分,并受到其保护。在下面的 AsyncDuplicateLock 实现中,TValue 是一个 readonly struct,也被声明为 record 以获得性能¹和方便性:

public class AsyncDuplicateLock
{
    private readonly ConcurrentDictionary<object, Entry> _semaphores = new();

    private readonly record struct Entry(SemaphoreSlim Semaphore, int RefCount);

    public readonly struct Releaser : IDisposable
    {
        private readonly AsyncDuplicateLock _parent;
        private readonly object _key;
        public Releaser(AsyncDuplicateLock parent, object key)
        {
            _parent = parent; _key = key;
        }
        public void Dispose() => _parent.Release(_key);
    }

    public async ValueTask<Releaser> LockAsync(object key)
    {
        Entry entry = _semaphores.AddOrUpdate(key,
            static _ => new Entry(new SemaphoreSlim(1, 1), 1),
            static (_, entry) => entry with { RefCount = entry.RefCount + 1 });

        await entry.Semaphore.WaitAsync().ConfigureAwait(false);
        return new Releaser(this, key);
    }

    private void Release(object key)
    {
        Entry entry;
        while (true)
        {
            bool exists = _semaphores.TryGetValue(key, out entry);
            if (!exists)
                throw new InvalidOperationException("Key not found.");
            if (entry.RefCount > 1)
            {
                Entry newEntry = entry with { RefCount = entry.RefCount - 1 };
                if (_semaphores.TryUpdate(key, newEntry, entry))
                    break;
            }
            else
            {
                if (_semaphores.TryRemove(KeyValuePair.Create(key, entry)))
                    break;
            }
        }
        entry.Semaphore.Release();
    }
}

请注意,增加和减少RefCount涉及在while循环中旋转。这是因为当前线程可能会输掉与其他线程更新字典的乐观竞争,在这种情况下,它会一直尝试直到成功。旋转在Release方法中很明显,但也在LockAsync方法内部发生。AddOrUpdate方法在调用updateValueFactory委托时采用类似的逻辑内部

性能:在高并发条件下,上述实现比一个更简单的基于Dictionary<K,V>的实现快大约80%。这是因为ConcurrentDictionary<K,V>在内部利用了多个锁对象,所以想要锁定键"A"的线程不必等待另一个线程完成获取或释放键"B"。虽然它需要更多的分配,但它更加高效。如果您有一些理由让垃圾收集器放松,那么基于Dictionary<K,V>的实现将更好地为您服务。如果您既想要最终的速度又想要最终的内存效率,您可以查看此答案的第6版,它是基于多个Dictionary<K,V>的实现。

注意:当错误使用SemaphoreSlim类时,它会抛出SemaphoreFullException异常。这种情况发生在释放信号量的次数超过获取信号量的次数时。本答案中AsyncDuplicateLock的实现在错误使用时表现不同:它会抛出一个InvalidOperationException("Key not found.")异常。这是因为当一个键被释放的次数等于获取次数时,相关的信号量将从字典中移除。如果此实现抛出SemaphoreFullException异常,则表示存在错误。

¹ ConcurrentDictionary<K,V>在许多操作(包括AddOrUpdateTryUpdateTryRemove等)中比较TValue,使用EqualityComparer<TValue>.Default。默认情况下,结构体不会被高效地比较,除非它们实现了IEquatable<T>接口。记录结构体确实实现了这个接口,类似于值元组的方式,因此它们可以高效地进行相等性比较。实际上,将值元组作为TValue(SemaphoreSlim, int))可能会稍微更高效,因为值元组的成员是字段,而记录结构体的成员是属性。不过,记录结构体更加方便。


对于同样的想法的非异步变体,请查看这个答案。 - Theodor Zoulias

3
我写了一个名为AsyncKeyedLock的库来解决这个常见问题。该库目前支持使用类型object(因此您可以将不同类型混合在一起)或使用泛型以获得更高效的解决方案。它允许超时、取消令牌,并且还使用池来减少分配。其底层使用ConcurrentDictionary,并允许设置此字典的初始容量和并发性。
我已经对其进行了基准测试,并与此处提供的其他解决方案进行了比较。从速度、内存使用(分配)以及可扩展性(在内部使用更可扩展的ConcurrentDictionary)方面,它更加高效。它正在生产中的许多系统中使用,并被许多流行的库使用。
源代码可在GitHub上获取,并在NuGet上打包。
这里的方法基本上是使用ConcurrentDictionary来存储一个具有计数器和SemaphoreSlimIDisposable对象。一旦该计数器达到0,它将从字典中删除,并且会被清除或返回到池中(如果使用池)。Monitor用于在计数器被增加或减少时锁定此对象。
使用示例:
var locker = new AsyncKeyedLocker<string>(o =>
{
   o.PoolSize = 20;
   o.PoolInitialFill = 1;
});

string key = "my key";

// asynchronous code
using (await locker.LockAsync(key, cancellationToken))
{
   ...
}

// synchronous code
using (locker.Lock(key))
{
   ...
}

NuGet下载。


0

针对给定的键,

  1. 线程1调用GetOrAdd并添加一个新的信号量,并通过Wait获取它
  2. 线程2调用GetOrAdd并获取现有的信号量,并在Wait上阻塞
  3. 线程1释放信号量,仅在调用TryRemove后才从字典中删除信号量
  4. 线程2现在获取信号量。
  5. 线程3为与线程1和2相同的键调用GetOrAdd。线程2仍然持有信号量,但信号量不在字典中,因此线程3创建一个新的信号量,线程2和3都访问同一受保护的资源。

您需要调整逻辑。只有在没有等待者时,才应将信号量从字典中删除。

这里是一个潜在的解决方案,减去了异步部分:

public sealed class AsyncDuplicateLock
{
    private class LockInfo
    {
        private SemaphoreSlim sem;
        private int waiterCount;

        public LockInfo()
        {
            sem = null;
            waiterCount = 1;
        }

        // Lazily create the semaphore
        private SemaphoreSlim Semaphore
        {
            get
            {
                var s = sem;
                if (s == null)
                {
                    s = new SemaphoreSlim(0, 1);
                    var original = Interlocked.CompareExchange(ref sem, null, s);
                    // If someone else already created a semaphore, return that one
                    if (original != null)
                        return original;
                }
                return s;
            }
        }

        // Returns true if successful
        public bool Enter()
        {
            if (Interlocked.Increment(ref waiterCount) > 1)
            {
                Semaphore.Wait();
                return true;
            }
            return false;
        }

        // Returns true if this lock info is now ready for removal
        public bool Exit()
        {
            if (Interlocked.Decrement(ref waiterCount) <= 0)
                return true;

            // There was another waiter
            Semaphore.Release();
            return false;
        }
    }

    private static readonly ConcurrentDictionary<object, LockInfo> activeLocks = new ConcurrentDictionary<object, LockInfo>();

    public static IDisposable Lock(object key)
    {
        // Get the current info or create a new one
        var info = activeLocks.AddOrUpdate(key,
          (k) => new LockInfo(),
          (k, v) => v.Enter() ? v : new LockInfo());

        DisposableScope releaser = new DisposableScope(() =>
        {
            if (info.Exit())
            {
                // Only remove this exact info, in case another thread has
                // already put its own info into the dictionary
                ((ICollection<KeyValuePair<object, LockInfo>>)activeLocks)
                  .Remove(new KeyValuePair<object, LockInfo>(key, info));
            }
        });

        return releaser;
    }

    private sealed class DisposableScope : IDisposable
    {
        private readonly Action closeScopeAction;

        public DisposableScope(Action closeScopeAction)
        {
            this.closeScopeAction = closeScopeAction;
        }

        public void Dispose()
        {
            this.closeScopeAction();
        }
    }
}

谢谢。我明白了解释,但现在我正在努力找出要测试哪个属性以确保它没有等待者。https://msdn.microsoft.com/en-us/library/system.threading.semaphoreslim_properties(v=vs.110).aspx - James South
谢谢... 但那不仅仅是同步的吗? - James South
是的,我甚至说过了。异步操作由您自己设计。这应该不难,因为它基于“SemaphoreSlim”,就像您的原始代码一样。 - Dark Falcon
好的,经过仔细检查代码后,我不确定能让它正常工作。锁定方法不能是静态的,而且我看不到如何公开semaphore.WaitAsync()以异步返回结果。 - James South

0

我用以下方式重写了@StephenCleary的答案:

public sealed class AsyncLockList {

    readonly Dictionary<object, SemaphoreReferenceCount> Semaphores = new Dictionary<object, SemaphoreReferenceCount>();

    SemaphoreSlim GetOrCreateSemaphore(object key) {
        lock (Semaphores) {
            if (Semaphores.TryGetValue(key, out var item)) {
                item.IncrementCount();
            } else {
                item = new SemaphoreReferenceCount();
                Semaphores[key] = item;
            }
            return item.Semaphore;
        }
    }

    public IDisposable Lock(object key) {
        GetOrCreateSemaphore(key).Wait();
        return new Releaser(Semaphores, key);
    }

    public async Task<IDisposable> LockAsync(object key) {
        await GetOrCreateSemaphore(key).WaitAsync().ConfigureAwait(false);
        return new Releaser(Semaphores, key);
    }

    sealed class SemaphoreReferenceCount {
        public readonly SemaphoreSlim Semaphore = new SemaphoreSlim(1, 1);
        public int Count { get; private set; } = 1;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void IncrementCount() => Count++;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void DecrementCount() => Count--;
    }

    sealed class Releaser : IDisposable {
        readonly Dictionary<object, SemaphoreReferenceCount> Semaphores;
        readonly object Key;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public Releaser(Dictionary<object, SemaphoreReferenceCount> semaphores, object key) {
            Semaphores = semaphores;
            Key = key;
        }

        public void Dispose() {
            lock (Semaphores) {
                var item = Semaphores[Key];
                item.DecrementCount();
                if (item.Count == 0)
                    Semaphores.Remove(Key);
                item.Semaphore.Release();
            }
        }
    }
}

1
@stephencleary,我用这种重写的代码更为舒适,您能否评论一下我可能引入的任何显著的低效性? - bboyle1234
我把你的答案复制粘贴到我的解决方案中。我更喜欢你重写代码的方式。然而它并不起作用。我不能告诉你为什么,但是对于相同的密钥没有锁定发生。Dark Falcon的答案开箱即用! - Mathieu G
我是指@Stephen Cleary。 - Mathieu G

0
这个之前的回答的启发,这里提供一个支持异步等待的版本:
    public class KeyedLock<TKey>
    {
        private readonly ConcurrentDictionary<TKey, LockInfo> _locks = new();

        public int Count => _locks.Count;

        public async Task<IDisposable> WaitAsync(TKey key, CancellationToken cancellationToken = default)
        {
            // Get the current info or create a new one.
            var info = _locks.AddOrUpdate(key,
                // Add
                k => new LockInfo(),
                // Update
                (k, v) => v.Enter() ? v : new LockInfo());

            try
            {
                await info.Semaphore.WaitAsync(cancellationToken);

                return new Releaser(() => Release(key, info, true));
            }
            catch (OperationCanceledException)
            {
                // The semaphore wait was cancelled, release the lock.
                Release(key, info, false);
                throw;
            }
        }

        private void Release(TKey key, LockInfo info, bool isCurrentlyLocked)
        {
            if (info.Leave())
            {
                // This was the last lock for the key.

                // Only remove this exact info, in case another thread has
                // already put its own info into the dictionary
                // Note that this call to Remove(entry) is in fact thread safe.
                var entry = new KeyValuePair<TKey, LockInfo>(key, info);
                if (((ICollection<KeyValuePair<TKey, LockInfo>>)_locks).Remove(entry))
                {
                    // This exact info was removed.
                    info.Dispose();
                }
            }
            else if (isCurrentlyLocked)
            {
                // There is another waiter.
                info.Semaphore.Release();
            }
        }

        private class LockInfo : IDisposable
        {
            private SemaphoreSlim _semaphore = null;
            private int _refCount = 1;

            public SemaphoreSlim Semaphore
            {
                get
                {
                    // Lazily create the semaphore.
                    var s = _semaphore;
                    if (s is null)
                    {
                        s = new SemaphoreSlim(1, 1);

                        // Assign _semaphore if its current value is null.
                        var original = Interlocked.CompareExchange(ref _semaphore, s, null);

                        // If someone else already created a semaphore, return that one
                        if (original is not null)
                        {
                            s.Dispose();
                            return original;
                        }
                    }
                    return s;
                }
            }

            // Returns true if successful
            public bool Enter()
            {
                if (Interlocked.Increment(ref _refCount) > 1)
                {
                    return true;
                }

                // This lock info is not valid anymore - its semaphore is or will be disposed.
                return false;
            }

            // Returns true if this lock info is now ready for removal
            public bool Leave()
            {
                if (Interlocked.Decrement(ref _refCount) <= 0)
                {
                    // This was the last lock
                    return true;
                }

                // There is another waiter
                return false;
            }

            public void Dispose() => _semaphore?.Dispose();
        }

        private sealed class Releaser : IDisposable
        {
            private readonly Action _dispose;

            public Releaser(Action dispose) => _dispose = dispose;

            public void Dispose() => _dispose();
        }
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接