I have an enumeration of items (RunData.Demand
), each representing some work involving calling an API over HTTP. It works great if I just foreach
through it all and call the API during each iteration. However, each iteration takes a second or two so I'd like to run 2-3 threads and divide up the work between them. Here's what I'm doing:
ThreadPool.SetMaxThreads(2, 5); // Trying to limit the amount of threads
var tasks = RunData.Demand
.Select(service => Task.Run(async delegate
{
var availabilityResponse = await client.QueryAvailability(service);
// Do some other stuff, not really important
}));
await Task.WhenAll(tasks);
The client.QueryAvailability
call basically calls an API using the HttpClient
class:
public async Task<QueryAvailabilityResponse> QueryAvailability(QueryAvailabilityMultidayRequest request)
{
var response = await client.PostAsJsonAsync("api/queryavailabilitymultiday", request);
if (response.IsSuccessStatusCode)
{
return await response.Content.ReadAsAsync<QueryAvailabilityResponse>();
}
throw new HttpException((int) response.StatusCode, response.ReasonPhrase);
}
This works great for a while, but eventually things start timing out. If I set the HttpClient Timeout to an hour, then I start getting weird internal server errors.
What I started doing was setting a Stopwatch within the QueryAvailability
method to see what was going on.
What's happening is all 1200 items in RunData.Demand are being created at once and all 1200 await client.PostAsJsonAsync
methods are being called. It appears it then uses the 2 threads to slowly check back on the tasks, so towards the end I have tasks that have been waiting for 9 or 10 minutes.
Here's the behavior I would like:
I'd like to create the 1,200 tasks, then run them 3-4 at a time as threads become available. I do not want to queue up 1,200 HTTP calls immediately.
Is there a good way to go about doing this?
While the Dataflow library is great, I think it's a bit heavy when not using block composition. I would tend to use something like the extension method below.
Also, unlike the Partitioner method, this runs the async methods on the calling context - the caveat being that if your code is not truly async, or takes a 'fast path', then it will effectively run synchronously since no threads are explicitly created.
Old question, but I would like to propose an alternative lightweight solution using the SemaphoreSlim class. Just reference System.Threading.
The semaphore acts as a locking mechanism. You can only enter the semaphore by calling Wait (WaitAsync) which subtracts one from the count. Calling release adds one to the count.
As I always recommend.. what you need is TPL Dataflow (to install:
Install-Package Microsoft.Tpl.Dataflow
).You create an
ActionBlock
with an action to perform on each item. SetMaxDegreeOfParallelism
for throttling. Start posting into it and await its completion:You're using async HTTP calls, so limiting the number of threads will not help (nor will
ParallelOptions.MaxDegreeOfParallelism
inParallel.ForEach
as one of the answers suggests). Even a single thread can initiate all requests and process the results as they arrive.One way to solve it is to use TPL Dataflow.
Another nice solution is to divide the source
IEnumerable
into partitions and process items in each partition sequentially as described in this blog post: