SemaphoreSlim with dynamic maxCount

2019-05-11 00:39发布

问题:

I'm facing a problem where I need to limit the number of calls to another web server. It will vary because the server is shared and maybe it could have more or less capacity.

I was thinking about using SemaphoreSlim class, but there's no public property to change the max count.

Should I wrap my SemaphoreSlim class in another class that will handle the max count? Is there any better approach?

EDIT:

Here's what I'm trying:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Semaphore
{
class Program
{
    static SemaphoreSlim _sem = new SemaphoreSlim(10,10000);

    static void Main(string[] args)
    {
        int max = 15;

        for (int i = 1; i <= 50; i++)
        {
            new Thread(Enter).Start(new int[] { i, max});
        }

        Console.ReadLine();

        max = 11;

        for (int i = 1; i <= 50; i++)
        {
            new Thread(Enter).Start(new int[] { i, max });
        }
    }

    static void Enter(object param)
    {
        int[] arr = (int[])param;
        int id = arr[0];
        int max = arr[1];

        try
        {
            Console.WriteLine(_sem.CurrentCount);

            if (_sem.CurrentCount <= max)
                _sem.Release(1);
            else
            {
                _sem.Wait(1000);

                Console.WriteLine(id + " wants to enter");

                Thread.Sleep((1000 * id) / 2); // can be here at

                Console.WriteLine(id + " is in!"); // Only three threads

            }
        }
        catch(Exception ex)
        {
            Console.WriteLine("opps ", id);
            Console.WriteLine(ex.Message);
        }
        finally            
        {
            _sem.Release();
        }
    }
}
}

Questions:

1-_sem.Wait(1000) should cancel the execution of threads that will execute for more than 1000ms, wasn't it?

2-Did I got the idea of using Release / Wait?

回答1:

You can't change the max count, but you can create a SemaphoreSlim that has a very high maximum count, and reserve some of them. See this constructor.

So let's say that the absolute maximum number of concurrent calls is 100, but initially you want it to be 25. You initialize your semaphore:

SemaphoreSlim sem = new SemaphoreSlim(25, 100);

So 25 is the number of requests that can be serviced concurrently. You have reserved the other 75.

If you then want to increase the number allowed, just call Release(num). If you called Release(10), then the number would go to 35.

Now, if you want to reduce the number of available requests, you have to call WaitOne multiple times. For example, if you want to remove 10 from the available count:

for (var i = 0; i < 10; ++i)
{
    sem.WaitOne();
}

This has the potential of blocking until other clients release the semaphore. That is, if you allow 35 concurrent requests and you want to reduce it to 25, but there are already 35 clients with active requests, that WaitOne will block until a client calls Release, and the loop won't terminate until 10 clients release.



回答2:

  1. Get a semaphore.
  2. Set the capacity to something quite a bit higher than you need it to be.
  3. Set the initial capacity to what you want your actual maximum capacity to be.
  4. Give out the semaphore to others to use.

At this point you can then wait on the semaphore however much you want (without a corresponding release call) to lower the capacity. You can release the semaphore a number of times (without a corresponding wait call) to increase the effective capacity.

If this is something you're doing enough of, you can potentially create your own semaphore class that composes a SemaphoreSlim and encapsulates this logic. This composition will also be essential if you have code that already releases a semaphore without first waiting on it; with your own class you could ensure that such releases are no-ops. (That said, you should avoid putting yourself in that position to begin with, really.)



回答3:

Ok, I could solve my problem lookin on mono project.

// SemaphoreSlim.cs
//
// Copyright (c) 2008 Jérémie "Garuma" Laval
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
//

using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace System.Threading
{
    public class SemaphoreSlimCustom : IDisposable
    {
        const int spinCount = 10;
        const int deepSleepTime = 20;
        private object _sync = new object();


        int maxCount;
        int currCount;
        bool isDisposed;

        public int MaxCount
        {
            get { lock (_sync) { return maxCount; } }
            set
            {
                lock (_sync)
                {
                    maxCount = value;
                }
            }
        }

        EventWaitHandle handle;

        public SemaphoreSlimCustom (int initialCount) : this (initialCount, int.MaxValue)
        {
        }

        public SemaphoreSlimCustom (int initialCount, int maxCount)
        {
            if (initialCount < 0 || initialCount > maxCount || maxCount < 0)
                throw new ArgumentOutOfRangeException ("The initialCount  argument is negative, initialCount is greater than maxCount, or maxCount is not positive.");

            this.maxCount = maxCount;
            this.currCount = initialCount;
            this.handle = new ManualResetEvent (initialCount > 0);
        }

        public void Dispose ()
        {
            Dispose(true);
        }

        protected virtual void Dispose (bool disposing)
        {
            isDisposed = true;
        }

        void CheckState ()
        {
            if (isDisposed)
                throw new ObjectDisposedException ("The SemaphoreSlim has been disposed.");
        }

        public int CurrentCount {
            get {
                return currCount;
            }
        }

        public int Release ()
        {
            return Release(1);
        }

        public int Release (int releaseCount)
        {
            CheckState ();
            if (releaseCount < 1)
                throw new ArgumentOutOfRangeException ("releaseCount", "releaseCount is less than 1");

            // As we have to take care of the max limit we resort to CAS
            int oldValue, newValue;
            do {
                oldValue = currCount;
                newValue = (currCount + releaseCount);
                newValue = newValue > maxCount ? maxCount : newValue;
            } while (Interlocked.CompareExchange (ref currCount, newValue, oldValue) != oldValue);

            handle.Set ();

            return oldValue;
        }

        public void Wait ()
        {
            Wait (CancellationToken.None);
        }

        public bool Wait (TimeSpan timeout)
        {
            return Wait ((int)timeout.TotalMilliseconds, CancellationToken.None);
        }

        public bool Wait (int millisecondsTimeout)
        {
            return Wait (millisecondsTimeout, CancellationToken.None);
        }

        public void Wait (CancellationToken cancellationToken)
        {
            Wait (-1, cancellationToken);
        }

        public bool Wait (TimeSpan timeout, CancellationToken cancellationToken)
        {
            CheckState();
            return Wait ((int)timeout.TotalMilliseconds, cancellationToken);
        }

        public bool Wait (int millisecondsTimeout, CancellationToken cancellationToken)
        {
            CheckState ();
            if (millisecondsTimeout < -1)
                throw new ArgumentOutOfRangeException ("millisecondsTimeout",
                                                       "millisecondsTimeout is a negative number other than -1");

            Stopwatch sw = Stopwatch.StartNew();

            Func<bool> stopCondition = () => millisecondsTimeout >= 0 && sw.ElapsedMilliseconds > millisecondsTimeout;

            do {
                bool shouldWait;
                int result;

                do {
                    cancellationToken.ThrowIfCancellationRequested ();
                    if (stopCondition ())
                        return false;

                    shouldWait = true;
                    result = currCount;

                    if (result > 0)
                        shouldWait = false;
                    else
                        break;
                } while (Interlocked.CompareExchange (ref currCount, result - 1, result) != result);

                if (!shouldWait) {
                    if (result == 1)
                        handle.Reset ();
                    break;
                }

                SpinWait wait = new SpinWait ();

                while (Thread.VolatileRead (ref currCount) <= 0) {
                    cancellationToken.ThrowIfCancellationRequested ();
                    if (stopCondition ())
                        return false;

                    if (wait.Count > spinCount) {
                        int diff = millisecondsTimeout - (int)sw.ElapsedMilliseconds;

                        int timeout = millisecondsTimeout < 0 ? deepSleepTime :


                            Math.Min (Math.Max (diff, 1), deepSleepTime);
                        handle.WaitOne (timeout);
                    } else
                        wait.SpinOnce ();
                }
            } while (true);

            return true;
        }

        public WaitHandle AvailableWaitHandle {
            get {
                return handle;
            }
        }

        public Task WaitAsync ()
        {
            return Task.Factory.StartNew (() => Wait ());
        }

        public Task WaitAsync (CancellationToken cancellationToken)
        {
            return Task.Factory.StartNew (() => Wait (cancellationToken), cancellationToken);
        }

        public Task<bool> WaitAsync (int millisecondsTimeout)
        {
            return Task.Factory.StartNew (() => Wait (millisecondsTimeout));
        }

        public Task<bool> WaitAsync (TimeSpan timeout)
        {
            return Task.Factory.StartNew (() => Wait (timeout));
        }

        public Task<bool> WaitAsync (int millisecondsTimeout, CancellationToken cancellationToken)
        {
            return Task.Factory.StartNew (() => Wait (millisecondsTimeout, cancellationToken), cancellationToken);
        }

        public Task<bool> WaitAsync (TimeSpan timeout, CancellationToken cancellationToken)
        {
            return Task.Factory.StartNew (() => Wait (timeout, cancellationToken), cancellationToken);
        }
    }
}