SemaphoreSlim与动态maxCount

11

我面临一个问题,需要限制对另一个Web服务器的调用次数。这将因为该服务器是共享的,可能会有更多或更少的容量而有所不同。

我在考虑使用SemaphoreSlim类,但没有公共属性来更改最大计数。

我应该将SemaphoreSlim类包装在另一个处理最大计数的类中吗?还有更好的方法吗?

编辑:

这是我正在尝试的:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Semaphore
{
class Program
{
    static SemaphoreSlim _sem = new SemaphoreSlim(10,10000);

    static void Main(string[] args)
    {
        int max = 15;

        for (int i = 1; i <= 50; i++)
        {
            new Thread(Enter).Start(new int[] { i, max});
        }

        Console.ReadLine();

        max = 11;

        for (int i = 1; i <= 50; i++)
        {
            new Thread(Enter).Start(new int[] { i, max });
        }
    }

    static void Enter(object param)
    {
        int[] arr = (int[])param;
        int id = arr[0];
        int max = arr[1];

        try
        {
            Console.WriteLine(_sem.CurrentCount);

            if (_sem.CurrentCount <= max)
                _sem.Release(1);
            else
            {
                _sem.Wait(1000);

                Console.WriteLine(id + " wants to enter");

                Thread.Sleep((1000 * id) / 2); // can be here at

                Console.WriteLine(id + " is in!"); // Only three threads

            }
        }
        catch(Exception ex)
        {
            Console.WriteLine("opps ", id);
            Console.WriteLine(ex.Message);
        }
        finally            
        {
            _sem.Release();
        }
    }
}
}

问题:

1- _sem.Wait(1000) 应该取消执行超过 1000 毫秒的线程,是这样吗?

2- 我掌握了使用 Release/Wait 的思路吗?

5个回答

24
您无法更改最大计数,但您可以创建一个具有非常高最大计数的SemaphoreSlim,并保留其中一些。请参见此构造函数

所以假设绝对最大并发调用数量为100,但最初您希望它为25。 您初始化信号量:

SemaphoreSlim sem = new SemaphoreSlim(25, 100);

同时可处理的请求数是25个,您已预留了其余的75个。

如果您想增加允许的请求数,只需调用Release(num)。如果您调用了Release(10),则该数字将变为35。

现在,如果您想减少可用请求的数量,则必须多次调用WaitOne。例如,如果您想从可用数量中减去10:

for (var i = 0; i < 10; ++i)
{
    sem.WaitOne();
}

这可能会阻塞,直到其他客户端释放信号量。也就是说,如果您允许35个并发请求,并且想将其减少到25个,但已经有35个具有活动请求的客户端,则WaitOne将被阻塞,直到某个客户端调用Release,并且循环不会终止,直到10个客户端释放为止。


这可能有所帮助,但我需要更灵活的东西。比如说,最多1000个并发,但几个小时后,最大值应该是600或1200。我相信SemaphoreSlim无法提供这种灵活性。=( - Thiago Custodio
6
你有没有读过答案?将第二个参数设置为您允许的最大值。然后,您可以使用ReleaseWaitOne进行调整可用的数量,就像描述的那样。 - Jim Mischel
你能看一下我的示例代码并帮我吗? - Thiago Custodio
@jim-mischel 你能想到一种方法来确保信号量槽的数量不会低于给定的最小值吗?我想避免将插槽数量减少到零,从而防止我的任何任务继续进行。像这样的东西可靠吗:if (sem.CurrentCount > myMinNumberOfSlots) sem.Wait(); - desautelsj
@jim-mischel 我做了一些测试并意识到,我们可以排除CurrentCount来确定插槽数量的目的。该属性指示当前可用插槽数量,其值随每次ReleaseWait调用而上下波动。 - desautelsj
你可以使用 while 循环来消耗请求,直到它将你降到所需的级别:while(semLock.CurrentCount > 1) { await semLock.WaitAsync(); } - Tod

6
  1. 获取一个信号量。
  2. 将容量设置为比您实际需要的要高得多的值。
  3. 将初始容量设置为您希望实际最大容量的值。
  4. 将信号量分配给其他人使用。

此时,您可以随意等待信号量(没有相应的释放调用)以降低容量。 您可以多次释放信号量(没有相应的等待调用)以增加有效容量。

如果这是您经常做的事情,则可以创建自己的信号量类,该类组合了SemaphoreSlim并封装了此逻辑。 如果您的代码已经在释放信号量之前没有等待它,请使用自己的类来确保这样的释放是无操作的。 (也就是说,你真的应该避免自己陷入这种境地。)


或者只需调用构造函数:http://msdn.microsoft.com/zh-cn/library/dd270891(v=vs.110).aspx - Jim Mischel
即使使用封装SemaphoreSlim的自定义类,我仍然需要灵活性来调整最大并发调用量。例如从1000开始,改为600,一段时间后再改为1700。 - Thiago Custodio
@JimMischel 当然可以,不过如果你想真正改变正确的最大值,你需要将其组合成另一种类型,这样你就可以确保在已经达到最大值但没有先增加最大值的情况下释放它会变成无操作(或异常)。 - Servy
@ThiagoCustodio 是的,这有什么问题吗?您可以编写一个SetMaxium方法,根据当前和期望最大值之间的差异,选择等待或释放。 - Servy
我应该更具体一些。我的意思是,如果你在步骤2中使用了那个构造函数,那么步骤3就可以被省略掉。至于你答案的其余部分(以及你的评论),我完全同意。没有封装,这种事情可能会非常危险。 - Jim Mischel
显示剩余3条评论

3

这是我解决这种情况的方式:我创建了一个自定义的信号量紧凑类,允许我增加和减少插槽的数量。这个类还允许我设置最大插槽数量,以便我永远不会超过“合理”的数量,同时也可以设置最小插槽数量,以便我不会低于“合理”的阈值。

using Picton.Messaging.Logging;
using System;
using System.Threading;

namespace Picton.Messaging.Utils
{
    /// <summary>
    /// An improvement over System.Threading.SemaphoreSlim that allows you to dynamically increase and
    /// decrease the number of threads that can access a resource or pool of resources concurrently.
    /// </summary>
    /// <seealso cref="System.Threading.SemaphoreSlim" />
    public class SemaphoreSlimDynamic : SemaphoreSlim
    {
        #region FIELDS

        private static readonly ILog _logger = LogProvider.GetLogger(typeof(SemaphoreSlimDynamic));
        private readonly ReaderWriterLockSlim _lock;

        #endregion

        #region PROPERTIES

        /// <summary>
        /// Gets the minimum number of slots.
        /// </summary>
        /// <value>
        /// The minimum slots count.
        /// </value>
        public int MinimumSlotsCount { get; private set; }

        /// <summary>
        /// Gets the number of slots currently available.
        /// </summary>
        /// <value>
        /// The available slots count.
        /// </value>
        public int AvailableSlotsCount { get; private set; }

        /// <summary>
        /// Gets the maximum number of slots.
        /// </summary>
        /// <value>
        /// The maximum slots count.
        /// </value>
        public int MaximumSlotsCount { get; private set; }

        #endregion

        #region CONSTRUCTOR

        /// <summary>
        /// Initializes a new instance of the <see cref="SemaphoreSlimDynamic"/> class.
        /// </summary>
        /// <param name="minCount">The minimum number of slots.</param>
        /// <param name="initialCount">The initial number of slots.</param>
        /// <param name="maxCount">The maximum number of slots.</param>
        public SemaphoreSlimDynamic(int minCount, int initialCount, int maxCount)
            : base(initialCount, maxCount)
        {
            _lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);

            this.MinimumSlotsCount = minCount;
            this.AvailableSlotsCount = initialCount;
            this.MaximumSlotsCount = maxCount;
        }

        #endregion

        #region PUBLIC METHODS

        /// <summary>
        /// Attempts to increase the number of slots
        /// </summary>
        /// <param name="millisecondsTimeout">The timeout in milliseconds.</param>
        /// <param name="increaseCount">The number of slots to add</param>
        /// <returns>true if the attempt was successfully; otherwise, false.</returns>
        public bool TryIncrease(int millisecondsTimeout = 500, int increaseCount = 1)
        {
            return TryIncrease(TimeSpan.FromMilliseconds(millisecondsTimeout), increaseCount);
        }

        /// <summary>
        /// Attempts to increase the number of slots
        /// </summary>
        /// <param name="timeout">The timeout.</param>
        /// <param name="increaseCount">The number of slots to add</param>
        /// <returns>true if the attempt was successfully; otherwise, false.</returns>
        public bool TryIncrease(TimeSpan timeout, int increaseCount = 1)
        {
            if (increaseCount < 0) throw new ArgumentOutOfRangeException(nameof(increaseCount));
            else if (increaseCount == 0) return false;

            var increased = false;

            try
            {
                if (this.AvailableSlotsCount < this.MaximumSlotsCount)
                {
                    var lockAcquired = _lock.TryEnterWriteLock(timeout);
                    if (lockAcquired)
                    {
                        for (int i = 0; i < increaseCount; i++)
                        {
                            if (this.AvailableSlotsCount < this.MaximumSlotsCount)
                            {
                                Release();
                                this.AvailableSlotsCount++;
                                increased = true;
                            }
                        }

                        if (increased) _logger.Trace($"Semaphore slots increased: {this.AvailableSlotsCount}");

                        _lock.ExitWriteLock();
                    }
                }
            }
            catch (SemaphoreFullException)
            {
                // An exception is thrown if we attempt to exceed the max number of concurrent tasks
                // It's safe to ignore this exception
            }

            return increased;
        }

        /// <summary>
        /// Attempts to decrease the number of slots
        /// </summary>
        /// <param name="millisecondsTimeout">The timeout in milliseconds.</param>
        /// <param name="decreaseCount">The number of slots to add</param>
        /// <returns>true if the attempt was successfully; otherwise, false.</returns>
        public bool TryDecrease(int millisecondsTimeout = 500, int decreaseCount = 1)
        {
            return TryDecrease(TimeSpan.FromMilliseconds(millisecondsTimeout), decreaseCount);
        }

        /// <summary>
        /// Attempts to decrease the number of slots
        /// </summary>
        /// <param name="timeout">The timeout.</param>
        /// <param name="decreaseCount">The number of slots to add</param>
        /// <returns>true if the attempt was successfully; otherwise, false.</returns>
        public bool TryDecrease(TimeSpan timeout, int decreaseCount = 1)
        {
            if (decreaseCount < 0) throw new ArgumentOutOfRangeException(nameof(decreaseCount));
            else if (decreaseCount == 0) return false;

            var decreased = false;

            if (this.AvailableSlotsCount > this.MinimumSlotsCount)
            {
                var lockAcquired = _lock.TryEnterWriteLock(timeout);
                if (lockAcquired)
                {
                    for (int i = 0; i < decreaseCount; i++)
                    {
                        if (this.AvailableSlotsCount > this.MinimumSlotsCount)
                        {
                            if (Wait(timeout))
                            {
                                this.AvailableSlotsCount--;
                                decreased = true;
                            }
                        }
                    }

                    if (decreased) _logger.Trace($"Semaphore slots decreased: {this.AvailableSlotsCount}");

                    _lock.ExitWriteLock();
                }
            }

            return decreased;
        }

        #endregion
    }
}

2

好的,我可以在Mono项目中找到解决我的问题的方法。

// SemaphoreSlim.cs
//
// Copyright (c) 2008 Jérémie "Garuma" Laval
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
//

using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace System.Threading
{
    public class SemaphoreSlimCustom : IDisposable
    {
        const int spinCount = 10;
        const int deepSleepTime = 20;
        private object _sync = new object();


        int maxCount;
        int currCount;
        bool isDisposed;

        public int MaxCount
        {
            get { lock (_sync) { return maxCount; } }
            set
            {
                lock (_sync)
                {
                    maxCount = value;
                }
            }
        }

        EventWaitHandle handle;

        public SemaphoreSlimCustom (int initialCount) : this (initialCount, int.MaxValue)
        {
        }

        public SemaphoreSlimCustom (int initialCount, int maxCount)
        {
            if (initialCount < 0 || initialCount > maxCount || maxCount < 0)
                throw new ArgumentOutOfRangeException ("The initialCount  argument is negative, initialCount is greater than maxCount, or maxCount is not positive.");

            this.maxCount = maxCount;
            this.currCount = initialCount;
            this.handle = new ManualResetEvent (initialCount > 0);
        }

        public void Dispose ()
        {
            Dispose(true);
        }

        protected virtual void Dispose (bool disposing)
        {
            isDisposed = true;
        }

        void CheckState ()
        {
            if (isDisposed)
                throw new ObjectDisposedException ("The SemaphoreSlim has been disposed.");
        }

        public int CurrentCount {
            get {
                return currCount;
            }
        }

        public int Release ()
        {
            return Release(1);
        }

        public int Release (int releaseCount)
        {
            CheckState ();
            if (releaseCount < 1)
                throw new ArgumentOutOfRangeException ("releaseCount", "releaseCount is less than 1");

            // As we have to take care of the max limit we resort to CAS
            int oldValue, newValue;
            do {
                oldValue = currCount;
                newValue = (currCount + releaseCount);
                newValue = newValue > maxCount ? maxCount : newValue;
            } while (Interlocked.CompareExchange (ref currCount, newValue, oldValue) != oldValue);

            handle.Set ();

            return oldValue;
        }

        public void Wait ()
        {
            Wait (CancellationToken.None);
        }

        public bool Wait (TimeSpan timeout)
        {
            return Wait ((int)timeout.TotalMilliseconds, CancellationToken.None);
        }

        public bool Wait (int millisecondsTimeout)
        {
            return Wait (millisecondsTimeout, CancellationToken.None);
        }

        public void Wait (CancellationToken cancellationToken)
        {
            Wait (-1, cancellationToken);
        }

        public bool Wait (TimeSpan timeout, CancellationToken cancellationToken)
        {
            CheckState();
            return Wait ((int)timeout.TotalMilliseconds, cancellationToken);
        }

        public bool Wait (int millisecondsTimeout, CancellationToken cancellationToken)
        {
            CheckState ();
            if (millisecondsTimeout < -1)
                throw new ArgumentOutOfRangeException ("millisecondsTimeout",
                                                       "millisecondsTimeout is a negative number other than -1");

            Stopwatch sw = Stopwatch.StartNew();

            Func<bool> stopCondition = () => millisecondsTimeout >= 0 && sw.ElapsedMilliseconds > millisecondsTimeout;

            do {
                bool shouldWait;
                int result;

                do {
                    cancellationToken.ThrowIfCancellationRequested ();
                    if (stopCondition ())
                        return false;

                    shouldWait = true;
                    result = currCount;

                    if (result > 0)
                        shouldWait = false;
                    else
                        break;
                } while (Interlocked.CompareExchange (ref currCount, result - 1, result) != result);

                if (!shouldWait) {
                    if (result == 1)
                        handle.Reset ();
                    break;
                }

                SpinWait wait = new SpinWait ();

                while (Thread.VolatileRead (ref currCount) <= 0) {
                    cancellationToken.ThrowIfCancellationRequested ();
                    if (stopCondition ())
                        return false;

                    if (wait.Count > spinCount) {
                        int diff = millisecondsTimeout - (int)sw.ElapsedMilliseconds;

                        int timeout = millisecondsTimeout < 0 ? deepSleepTime :


                            Math.Min (Math.Max (diff, 1), deepSleepTime);
                        handle.WaitOne (timeout);
                    } else
                        wait.SpinOnce ();
                }
            } while (true);

            return true;
        }

        public WaitHandle AvailableWaitHandle {
            get {
                return handle;
            }
        }

        public Task WaitAsync ()
        {
            return Task.Factory.StartNew (() => Wait ());
        }

        public Task WaitAsync (CancellationToken cancellationToken)
        {
            return Task.Factory.StartNew (() => Wait (cancellationToken), cancellationToken);
        }

        public Task<bool> WaitAsync (int millisecondsTimeout)
        {
            return Task.Factory.StartNew (() => Wait (millisecondsTimeout));
        }

        public Task<bool> WaitAsync (TimeSpan timeout)
        {
            return Task.Factory.StartNew (() => Wait (timeout));
        }

        public Task<bool> WaitAsync (int millisecondsTimeout, CancellationToken cancellationToken)
        {
            return Task.Factory.StartNew (() => Wait (millisecondsTimeout, cancellationToken), cancellationToken);
        }

        public Task<bool> WaitAsync (TimeSpan timeout, CancellationToken cancellationToken)
        {
            return Task.Factory.StartNew (() => Wait (timeout, cancellationToken), cancellationToken);
        }
    }
}

-1

更新的 .Net Core 5 答案:

假设我想要一个最多只能处理10个请求的锁,但大部分时间我只需要1个。

private readonly static SemaphoreSlim semLock = new(1, 10);

现在当我想要释放一些资源时,我可以这样做:

semLock.Release(Math.Min(9, requiredAmount));

请注意,9比10少一个,因为我们最初已经发布了一个版本。
一旦我想再次限制可用资源,我可以调用:
while(semLock.CurrentCount > 1)
{
    await semLock.WaitAsync();
}

这将等待将其降至1


检查CurrentCount将显示错误的值。如果semLock正在使用中,则可能已经CurrentCount == 0,但在下一秒钟会回到10。 正确的解决方案需要有一个单独的计数器来检查在给定锁上调用WaitAsync的次数。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接