How to correctly queue up tasks to run in C#

2020-05-22 10:11发布

I have an enumeration of items (RunData.Demand), each representing some work involving calling an API over HTTP. It works great if I just foreach through it all and call the API during each iteration. However, each iteration takes a second or two so I'd like to run 2-3 threads and divide up the work between them. Here's what I'm doing:

ThreadPool.SetMaxThreads(2, 5); // Trying to limit the amount of threads
var tasks = RunData.Demand
   .Select(service => Task.Run(async delegate
   {
      var availabilityResponse = await client.QueryAvailability(service);
      // Do some other stuff, not really important
   }));

await Task.WhenAll(tasks);

The client.QueryAvailability call basically calls an API using the HttpClient class:

public async Task<QueryAvailabilityResponse> QueryAvailability(QueryAvailabilityMultidayRequest request)
{
   var response = await client.PostAsJsonAsync("api/queryavailabilitymultiday", request);

   if (response.IsSuccessStatusCode)
   {
      return await response.Content.ReadAsAsync<QueryAvailabilityResponse>();
   }

   throw new HttpException((int) response.StatusCode, response.ReasonPhrase);
}

This works great for a while, but eventually things start timing out. If I set the HttpClient Timeout to an hour, then I start getting weird internal server errors.

What I started doing was setting a Stopwatch within the QueryAvailability method to see what was going on.

What's happening is all 1200 items in RunData.Demand are being created at once and all 1200 await client.PostAsJsonAsync methods are being called. It appears it then uses the 2 threads to slowly check back on the tasks, so towards the end I have tasks that have been waiting for 9 or 10 minutes.

Here's the behavior I would like:

I'd like to create the 1,200 tasks, then run them 3-4 at a time as threads become available. I do not want to queue up 1,200 HTTP calls immediately.

Is there a good way to go about doing this?

4条回答
神经病院院长
2楼-- · 2020-05-22 10:35

While the Dataflow library is great, I think it's a bit heavy when not using block composition. I would tend to use something like the extension method below.

Also, unlike the Partitioner method, this runs the async methods on the calling context - the caveat being that if your code is not truly async, or takes a 'fast path', then it will effectively run synchronously since no threads are explicitly created.

public static async Task RunParallelAsync<T>(this IEnumerable<T> items, Func<T, Task> asyncAction, int maxParallel)
{
    var tasks = new List<Task>();

    foreach (var item in items)
    {
        tasks.Add(asyncAction(item));

        if (tasks.Count < maxParallel)
                continue; 

        var notCompleted = tasks.Where(t => !t.IsCompleted).ToList();

        if (notCompleted.Count >= maxParallel)
            await Task.WhenAny(notCompleted);
    }

    await Task.WhenAll(tasks);
}
查看更多
再贱就再见
3楼-- · 2020-05-22 10:39

Old question, but I would like to propose an alternative lightweight solution using the SemaphoreSlim class. Just reference System.Threading.

SemaphoreSlim sem = new SemaphoreSlim(4,4);

foreach (var service in RunData.Demand)
{

    await sem.WaitAsync();
    Task t = Task.Run(async () => 
    {
        var availabilityResponse = await client.QueryAvailability(serviceCopy));    
        // do your other stuff here with the result of QueryAvailability
    }
    t.ContinueWith(sem.Release());
}

The semaphore acts as a locking mechanism. You can only enter the semaphore by calling Wait (WaitAsync) which subtracts one from the count. Calling release adds one to the count.

查看更多
放我归山
4楼-- · 2020-05-22 10:46

As I always recommend.. what you need is TPL Dataflow (to install: Install-Package Microsoft.Tpl.Dataflow).

You create an ActionBlock with an action to perform on each item. Set MaxDegreeOfParallelism for throttling. Start posting into it and await its completion:

var block = new ActionBlock<QueryAvailabilityMultidayRequest>(async service => 
{
    var availabilityResponse = await client.QueryAvailability(service);
    // ...
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

foreach (var service in RunData.Demand)
{
    block.Post(service);
}

block.Complete();
await block.Completion;
查看更多
唯我独甜
5楼-- · 2020-05-22 10:56

You're using async HTTP calls, so limiting the number of threads will not help (nor will ParallelOptions.MaxDegreeOfParallelism in Parallel.ForEach as one of the answers suggests). Even a single thread can initiate all requests and process the results as they arrive.

One way to solve it is to use TPL Dataflow.

Another nice solution is to divide the source IEnumerable into partitions and process items in each partition sequentially as described in this blog post:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}
查看更多
登录 后发表回答