Queue of async tasks with throttling which support

2020-02-08 02:59发布

I need to implement a library to request vk.com API. The problem is that API supports only 3 requests per second. I would like to have API asynchronous.

Important: API should support safe accessing from multiple threads.

My idea is implement some class called throttler which allow no more than 3 request/second and delay other request.

The interface is next:

public interface IThrottler : IDisposable
{
    Task<TResult> Throttle<TResult>(Func<Task<TResult>> task);
}

The usage is like

var audio = await throttler.Throttle(() => api.MyAudio());
var messages = await throttler.Throttle(() => api.ReadMessages());
var audioLyrics = await throttler.Throttle(() => api.AudioLyrics(audioId));
/// Here should be delay because 3 requests executed
var photo = await throttler.Throttle(() => api.MyPhoto());

How to implement throttler?

Currently I implemented it as queue which is processed by background thread.

public Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
{
    /// TaskRequest has method Run() to run task
    /// TaskRequest uses TaskCompletionSource to provide new task 
    /// which is resolved when queue processed til this element.
    var request = new TaskRequest<TResult>(task);

    requestQueue.Enqueue(request);

    return request.ResultTask;
}

This is shorten code of background thread loop which process the queue:

private void ProcessQueue(object state)
{
    while (true)
    {
        IRequest request;
        while (requestQueue.TryDequeue(out request))
        {
            /// Delay method calculates actual delay value and calls Thread.Sleep()
            Delay();
            request.Run();
        }

    }
}

Is it possible to implement this without background thread?

5条回答
看我几分像从前
2楼-- · 2020-02-08 03:43

Edit: this solution works but use it only if it is ok to process all request in serial (in one thread). Otherwise use solution accepted as answer.

Well, thanks to Best way in .NET to manage queue of tasks on a separate (single) thread

My question is almost duplicate except adding delay before execution, which is actually simple.

The main helper here is SemaphoreSlim class which allows to restrict degree of parallelism.

So, first create a semaphore:

// Semaphore allows run 1 thread concurrently.
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);

And, final version of throttle looks like

public async Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
{
    await semaphore.WaitAsync();
    try
    {
        await delaySource.Delay();
        return await task();
    }
    finally
    {
        semaphore.Release();
    }
}

Delay source is also pretty simple:

private class TaskDelaySource
{
    private readonly int maxTasks;
    private readonly TimeSpan inInterval;
    private readonly Queue<long> ticks = new Queue<long>();

    public TaskDelaySource(int maxTasks, TimeSpan inInterval)
    {
        this.maxTasks = maxTasks;
        this.inInterval = inInterval;
    }

    public async Task Delay()
    {
        // We will measure time of last maxTasks tasks.
        while (ticks.Count > maxTasks)
            ticks.Dequeue();

        if (ticks.Any())
        {
            var now = DateTime.UtcNow.Ticks;
            var lastTick = ticks.First();
            // Calculate interval between last maxTasks task and current time
            var intervalSinceLastTask = TimeSpan.FromTicks(now - lastTick);

            if (intervalSinceLastTask < inInterval)
                await Task.Delay((int)(inInterval - intervalSinceLastTask).TotalMilliseconds);
        }

        ticks.Enqueue(DateTime.UtcNow.Ticks);
    }
}
查看更多
贪生不怕死
3楼-- · 2020-02-08 03:49

Here is one solution that uses a Stopwatch:

public class Throttler : IThrottler
{
    private readonly Stopwatch m_Stopwatch;
    private int m_NumberOfRequestsInLastSecond;
    private readonly int m_MaxNumberOfRequestsPerSecond;

    public Throttler(int max_number_of_requests_per_second)
    {
        m_MaxNumberOfRequestsPerSecond = max_number_of_requests_per_second;
        m_Stopwatch = Stopwatch.StartNew();
    }


    public async Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
    {
        var elapsed = m_Stopwatch.Elapsed;

        if (elapsed > TimeSpan.FromSeconds(1))
        {
            m_NumberOfRequestsInLastSecond = 1;

            m_Stopwatch.Restart();

            return await task();
        }

        if (m_NumberOfRequestsInLastSecond >= m_MaxNumberOfRequestsPerSecond)
        {
            TimeSpan time_to_wait = TimeSpan.FromSeconds(1) - elapsed;

            await Task.Delay(time_to_wait);

            m_NumberOfRequestsInLastSecond = 1;

            m_Stopwatch.Restart();

            return await task();
        }

        m_NumberOfRequestsInLastSecond++;

        return await task();
    }
}

Here is how this code can be tested:

class Program
{
    static void Main(string[] args)
    {
        DoIt();

        Console.ReadLine();
    }

    static async Task DoIt()
    {
        Func<Task<int>> func = async () =>
        {
            await Task.Delay(100);
            return 1;
        };

        Throttler throttler = new Throttler(3);

        for (int i = 0; i < 10; i++)
        {
            var result = await throttler.Throttle(func);

            Console.WriteLine(DateTime.Now);
        }            
    }
}
查看更多
▲ chillily
4楼-- · 2020-02-08 03:50

So we'll start out with a solution to a simpler problem, that of creating a queue that process up to N tasks concurrently, rather than throttling to N tasks started per second, and build on that:

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }
    public TaskQueue(int concurrentRequests)
    {
        semaphore = new SemaphoreSlim(concurrentRequests);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

We'll also use the following helper methods to match the result of a TaskCompletionSource to a `Task:

public static void Match<T>(this TaskCompletionSource<T> tcs, Task<T> task)
{
    task.ContinueWith(t =>
    {
        switch (t.Status)
        {
            case TaskStatus.Canceled:
                tcs.SetCanceled();
                break;
            case TaskStatus.Faulted:
                tcs.SetException(t.Exception.InnerExceptions);
                break;
            case TaskStatus.RanToCompletion:
                tcs.SetResult(t.Result);
                break;
        }

    });
}

public static void Match<T>(this TaskCompletionSource<T> tcs, Task task)
{
    Match(tcs, task.ContinueWith(t => default(T)));
}

Now for our actual solution what we can do is each time we need to perform a throttled operation we create a TaskCompletionSource, and then go into our TaskQueue and add an item that starts the task, matches the TCS to its result, doesn't await it, and then delays the task queue for 1 second. The task queue will then not allow a task to start until there are no longer N tasks started in the past second, while the result of the operation itself is the same as the create Task:

public class Throttler
{
    private TaskQueue queue;
    public Throttler(int requestsPerSecond)
    {
        queue = new TaskQueue(requestsPerSecond);
    }
    public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
    public Task Enqueue<T>(Func<Task> taskGenerator)
    {
        TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
}
查看更多
beautiful°
5楼-- · 2020-02-08 03:51

You can use this as Generic

public TaskThrottle(int maxTasksToRunInParallel)
{
    _semaphore = new SemaphoreSlim(maxTasksToRunInParallel);
}

public void TaskThrottler<T>(IEnumerable<Task<T>> tasks, int timeoutInMilliseconds, CancellationToken cancellationToken = default(CancellationToken)) where T : class
{
    // Get Tasks as List
    var taskList = tasks as IList<Task<T>> ?? tasks.ToList();
    var postTasks = new List<Task<int>>();

    // When the first task completed, it will flag 
    taskList.ForEach(x =>
    {
        postTasks.Add(x.ContinueWith(y => _semaphore.Release(), cancellationToken));
    });

    taskList.ForEach(x =>
    {
        // Wait for open slot 
        _semaphore.Wait(timeoutInMilliseconds, cancellationToken);
        cancellationToken.ThrowIfCancellationRequested();
        x.Start();
    });

    Task.WaitAll(taskList.ToArray(), cancellationToken);
}
查看更多
萌系小妹纸
6楼-- · 2020-02-08 03:52

I solved a similar problem using a wrapper around SemaphoreSlim. In my scenario, I had some other throttling mechanisms as well, and I needed to make sure that requests didn't hit the external API too often even if request number 1 took longer to reach the API than request number 3. My solution was to use a wrapper around SemaphoreSlim that had to be released by the caller, but the actual SemaphoreSlim would not be released until a set time had passed.

public class TimeGatedSemaphore
{
    private readonly SemaphoreSlim semaphore;
    public TimeGatedSemaphore(int maxRequest, TimeSpan minimumHoldTime)
    {
        semaphore = new SemaphoreSlim(maxRequest);
        MinimumHoldTime = minimumHoldTime;
    }
    public TimeSpan MinimumHoldTime { get; }

    public async Task<IDisposable> WaitAsync()
    {
        await semaphore.WaitAsync();
        return new InternalReleaser(semaphore, Task.Delay(MinimumHoldTime));
    }

    private class InternalReleaser : IDisposable
    {
        private readonly SemaphoreSlim semaphoreToRelease;
        private readonly Task notBeforeTask;
        public InternalReleaser(SemaphoreSlim semaphoreSlim, Task dependantTask)
        {
            semaphoreToRelease = semaphoreSlim;
            notBeforeTask = dependantTask;
        }
        public void Dispose()
        {
            notBeforeTask.ContinueWith(_ => semaphoreToRelease.Release());
        }
    }
}

Example usage:

private TimeGatedSemaphore requestThrottler = new TimeGatedSemaphore(3, TimeSpan.FromSeconds(1));
public async Task<T> MyRequestSenderHelper(string endpoint)
{
    using (await requestThrottler.WaitAsync())
        return await SendRequestToAPI(endpoint);        
}
查看更多
登录 后发表回答