I would like to run a bunch of async tasks, with a limit on how many tasks may be pending completion at any given time.
Say you have 1000 URLs, and you only want to have 50 requests open at a time; but as soon as one request completes, you open up a connection to the next URL in the list. That way, there are always exactly 50 connections open at a time, until the URL list is exhausted.
I also want to utilize a given number of threads if possible.
I came up with an extension method, ThrottleTasksAsync
that does what I want. Is there a simpler solution already out there? I would assume that this is a common scenario.
Usage:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
Here is the code:
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
The method utilizes BlockingCollection
and SemaphoreSlim
to make it work. The throttler is run on one thread, and all the async tasks are run on the other thread. To achieve parallelism, I added a maxDegreeOfParallelism parameter that's passed to a Parallel.ForEach
loop re-purposed as a while
loop.
The old version was:
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
But, the thread pool gets exhausted fast, and you can't do async
/await
.
Bonus:
To get around the problem in BlockingCollection
where an exception is thrown in Take()
when CompleteAdding()
is called, I'm using the TryTake
overload with a timeout. If I didn't use the timeout in TryTake
, it would defeat the purpose of using a BlockingCollection
since TryTake
won't block. Is there a better way? Ideally, there would be a TakeAsync
method.
As requested, here's the code I ended up going with.
The work is set up in a master-detail configuration, and each master is processed as a batch. Each unit of work is queued up in this fashion:
Masters are buffered one at a time to save work for other outside processes. The details for each master are dispatched for work via the
masterTransform
TransformManyBlock
. ABatchedJoinBlock
is also created to collect the details in one batch.The actual work is done in the
detailTransform
TransformBlock
, asynchronously, 150 at a time.BoundedCapacity
is set to 300 to ensure that too many Masters don't get buffered at the beginning of the chain, while also leaving room for enough detail records to be queued to allow 150 records to be processed at one time. The block outputs anobject
to its targets, because it's filtered across the links depending on whether it's aDetail
orException
.The
batchAction
ActionBlock
collects the output from all the batches, and performs bulk database updates, error logging, etc. for each batch.There will be several
BatchedJoinBlock
s, one for each master. Since eachISourceBlock
is output sequentially and each batch only accepts the number of detail records associated with one master, the batches will be processed in order. Each block only outputs one group, and is unlinked on completion. Only the last batch block propagates its completion to the finalActionBlock
.The dataflow network:
The following simple solution has surfaced many times here on SO. It doesn't use blocking code and doesn't create threads explicitly, so it scales very well:
The thing is, the processing of the downloaded data should be done on a different pipeline, with a different level of parallelism, especially if it's a CPU-bound processing.
E.g., you'd probably want to have 4 threads concurrently doing the data processing (the number of CPU cores), and up to 50 pending requests for more data (which do not use threads at all). AFAICT, this is not what your code is currently doing.
That's where TPL Dataflow or Rx may come in handy as a preferred solution. Yet it is certainly possible to implement something like this with plain TPL. Note, the only blocking code here is the one doing the actual data processing inside
Task.Run
:As suggested, use TPL Dataflow.
A
TransformBlock<TInput, TOutput>
may be what you're looking for.You define a
MaxDegreeOfParallelism
to limit how many strings can be transformed (i.e., how many urls can be downloaded) in parallel. You then post urls to the block, and when you're done you tell the block you're done adding items and you fetch the responses.Note: The
TransformBlock
buffers both its input and output. Why, then, do we need to link it to aBufferBlock
?Because the
TransformBlock
won't complete until all items (HttpResponse
) have been consumed, andawait downloader.Completion
would hang. Instead, we let thedownloader
forward all its output to a dedicated buffer block - then we wait for thedownloader
to complete, and inspect the buffer block.