Optimizing for fire & forget using async/await and

2019-02-25 18:00发布

I have about 5 million items to update. I don't really care about the response (A response would be nice to have so I can log it, but I don't want a response if that will cost me time.) Having said that, is this code optimized to run as fast as possible? If there are 5 million items, would I run the risk of getting any task cancelled or timeout errors? I get about 1 or 2 responses back every second.

var tasks = items.Select(async item =>
{
    await Update(CreateUrl(item));
}).ToList();

if (tasks.Any())
{
    await Task.WhenAll(tasks);
}                

private async Task<HttpResponseMessage> Update(string url)
{
    var client = new HttpClient();
    var response = await client.SendAsync(url).ConfigureAwait(false);    
    //log response.
}

UPDATE: I am actually getting TaskCanceledExceptions. Did my system run out of threads? What could I do to avoid this?

3条回答
你好瞎i
2楼-- · 2019-02-25 18:40

A much better approach would be to use TPL Dataflow's ActionBlock with MaxDegreeOfParallelism and a single HttpClient:

Task UpdateAll(IEnumerable<Item> items)
{
    var block = new ActionBlock<Item>(
        item => UpdateAsync(CreateUrl(item)), 
        new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1000});

    foreach (var item in items)
    {
        block.Post(item);
    }

    block.Complete();
    return block.Completion;
}

async Task UpdateAsync(string url)
{
    var response = await _client.SendAsync(url).ConfigureAwait(false);    
    Console.WriteLine(response.StatusCode);
}
  • A single HttpClient can be used concurrently for multiple requests, and so it's much better to only create and disposing a single instance instead of 5 million.
  • There are numerous problems in firing so many request at the same time: The machine's network stack, the target web site, timeouts and so forth. The ActionBlock caps that number with the MaxDegreeOfParallelism (which you should test and optimize for your specific case). It's important to note that TPL may choose a lower number when it deems it to be appropriate.
  • When you have a single async call at the end of an async method or lambda expression, it's better for performance to remove the redundant async-await and just return the task (i.e return block.Completion;)
  • Complete will notify the ActionBlock to not accept any more items, but finish processing items it already has. When it's done the Completion task will be done so you can await it.
查看更多
【Aperson】
3楼-- · 2019-02-25 18:50

I suspect you are suffering from outgoing connection management preventing large numbers of simultaneous connections to the same domain. The answers given in this extensive Q+A might give you some avenues to investigate.

What is limiting the # of simultaneous connections my ASP.NET application can make to a web service?

In terms of your code structure, I'd personally try and use a dynamic pool of connections. You know that you cant actually get 5m connections simultaneously so trying to attempt it will just fail to work - you may as well deal with a reasonable and configured limit of (for instance) 20 connections and use them in a pool. In this way you can tune up or down.

alternatively you could investigate HTTP Pipelining (which I've not used) which is intended specifically for the job you are doing (batching up Http requests). http://en.wikipedia.org/wiki/HTTP_pipelining

查看更多
Deceive 欺骗
4楼-- · 2019-02-25 18:57

You method will kick off all tasks at the same time, which may not be what you want. There wouldn't be any threads involved because with async operations There is no thread, but there may be number of concurrent connection limits.

There may be better tools to do this but if you want to use async/await one option is to use Stephen Toub's ForEachAsync as documented in this article. It allows you to control how many simultaneous operations you want to execute, so you don't overrun your connection limit.

Here it is from the article:

public static class Extensions
{
     public static async Task ExecuteInPartition<T>(IEnumerator<T> partition, Func<T, Task> body)
     {
         using (partition)
             while (partition.MoveNext())
                await body(partition.Current);
     }

     public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
     {      
         return Task.WhenAll(
             from partition in Partitioner.Create(source).GetPartitions(dop)
                  select ExecuteInPartition(partition, body));
     }
}

Usage:

public async Task UpdateAll()
{
    // Allow for 100 concurrent Updates
    await items.ForEachAsync(100, async t => await Update(t));  
}
查看更多
登录 后发表回答