How to limit the amount of concurrent async I/O op

2019-01-01 12:44发布

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

Here is the problem, it starts 1000+ simultaneous web requests. Is there an easy way to limit the concurrent amount of these async http requests? So that no more than 20 web pages are downloaded at any given time. How to do it in the most efficient manner?

12条回答
其实,你不懂
2楼-- · 2019-01-01 13:03

Just a more succinct version of https://stackoverflow.com/a/10810730/1186165:

static async Task WhenAll(IEnumerable<Task> tasks, int maxThreadCount) {
    using (var guard = new SemaphoreSlim(initialCount: maxThreadCount)) {
        await Task.WhenAll(tasks.Select(async task => {
            await guard.WaitAsync();

            return task.ContinueWith(t => guard.Release());
        }));
    }
}
查看更多
谁念西风独自凉
3楼-- · 2019-01-01 13:05

Unfortunately, the .NET Framework is missing most important combinators for orchestrating parallel async tasks. There is no such thing built-in.

Look at the AsyncSemaphore class built by the most respectable Stephen Toub. What you want is called a semaphore, and you need an async version of it.

查看更多
琉璃瓶的回忆
4楼-- · 2019-01-01 13:07

Theo Yaung example is nice, but there is a variant without list of waiting tasks.

 class SomeChecker
 {
    private const int ThreadCount=20;
    private CountdownEvent _countdownEvent;
    private SemaphoreSlim _throttler;

    public Task Check(IList<string> urls)
    {
        _countdownEvent = new CountdownEvent(urls.Count);
        _throttler = new SemaphoreSlim(ThreadCount); 

        return Task.Run( // prevent UI thread lock
            async  () =>{
                foreach (var url in urls)
                {
                    // do an async wait until we can schedule again
                    await _throttler.WaitAsync();
                    ProccessUrl(url); // NOT await
                }
                //instead of await Task.WhenAll(allTasks);
                _countdownEvent.Wait();
            });
    }

    private async Task ProccessUrl(string url)
    {
        try
        {
            var page = await new WebClient()
                       .DownloadStringTaskAsync(new Uri(url)); 
            ProccessResult(page);
        }
        finally
        {
            _throttler.Release();
            _countdownEvent.Signal();
        }
    }

    private void ProccessResult(string page){/*....*/}
}
查看更多
君临天下
5楼-- · 2019-01-01 13:07

Use MaxDegreeOfParallelism, which is an option you can specify in Parallel.ForEach():

var options = new ParallelOptions { MaxDegreeOfParallelism = 20 };

Parallel.ForEach(urls, options,
    url =>
        {
            var client = new HttpClient();
            var html = client.GetStringAsync(url);
            // do stuff with html
        });
查看更多
素衣白纱
6楼-- · 2019-01-01 13:08

If you have an IEnumerable (ie. strings of URL s) and you want to do an I/O bound operation with each of these (ie. make an async http request) concurrently AND optionally you also want to set the maximum number of concurrent I/O requests in real time, here is how you can do that. This way you do not use thread pool et al, the method uses semaphoreslim to control max concurrent I/O requests similar to a sliding window pattern one request completes, leaves the semaphore and the next one gets in.

usage: await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);

public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }
查看更多
不再属于我。
7楼-- · 2019-01-01 13:09

Old question, new answer. @vitidev had a block of code that was reused almost intact in a project I reviewed. After discussing with a few colleagues one asked "Why don't you just use the built-in TPL methods?" ActionBlock looks like the winner there. https://msdn.microsoft.com/en-us/library/hh194773(v=vs.110).aspx. Probably won't end up changing any existing code but will definitely look to adopt this nuget and reuse Mr. Softy's best practice for throttled parallelism.

查看更多
登录 后发表回答