How to throttle multiple asynchronous tasks?

2019-02-15 17:12发布

I have some code of the following form:

static async Task DoSomething(int n) 
{
  ...
}

static void RunThreads(int totalThreads, int throttle) 
{
  var tasks = new List<Task>();
  for (var n = 0; n < totalThreads; n++)
  {
    var task = DoSomething(n);
    tasks.Add(task);
  }
  Task.WhenAll(tasks).Wait(); // all threads must complete
}

Trouble is, if I don't throttle the threads, things start falling apart. Now, I want to launch a maximum of throttle threads, and only start the new thread when an old one is complete. I've tried a few approaches and none so far has worked. Problems I have encountered include:

  • The tasks collection must be fully populated with all tasks, whether active or awaiting execution, otherwise the final .Wait() call only looks at the threads that it started with.
  • Chaining the execution seems to require use of Task.Run() or the like. But I need a reference to each task from the outset, and instantiating a task seems to kick it off automatically, which is what I don't want.

How to do this?

5条回答
疯言疯语
2楼-- · 2019-02-15 17:35

Here are some extension method variations to build on Sriram Sakthivel answer.

In the usage example, calls to DoSomething are being wrapped in an explicitly cast closure to allow passing arguments.

public static async Task RunMyThrottledTasks()
{
    var myArgsSource = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
    await myArgsSource
        .Select(a => (Func<Task<object>>)(() => DoSomething(a)))
        .Throttle(2);
}

public static async Task<object> DoSomething(int arg)
{
    // Await some async calls that need arg..
    // ..then return result async Task..
    return new object();
}

public static async Task<IEnumerable<T>> Throttle<T>(IEnumerable<Func<Task<T>>> toRun, int throttleTo)
{
    var running = new List<Task<T>>(throttleTo);
    var completed = new List<Task<T>>(toRun.Count());
    foreach(var taskToRun in toRun)
    {
        running.Add(taskToRun());
        if(running.Count == throttleTo)
        {
            var comTask = await Task.WhenAny(running);
            running.Remove(comTask);
            completed.Add(comTask);
        }
    }
    return completed.Select(t => t.Result);
}

public static async Task Throttle(this IEnumerable<Func<Task>> toRun, int throttleTo)
{
    var running = new List<Task>(throttleTo);
    foreach(var taskToRun in toRun)
    {
        running.Add(taskToRun());
        if(running.Count == throttleTo)
        {
            var comTask = await Task.WhenAny(running);
            running.Remove(comTask);
        }
    }
}
查看更多
倾城 Initia
3楼-- · 2019-02-15 17:48

Stephen Toub gives the following example for throttling in his The Task-based Asynchronous Pattern document.

const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
    imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
    nextIndex++;
}

while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch(Exception exc) { Log(exc); }

    if (nextIndex < urls.Length)
    {
        imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
        nextIndex++;
    }
}
查看更多
Emotional °昔
4楼-- · 2019-02-15 17:49

Microsoft's Reactive Extensions (Rx) - NuGet "Rx-Main" - has this problem sorted very nicely.

Just do this:

static void RunThreads(int totalThreads, int throttle) 
{
    Observable
        .Range(0, totalThreads)
        .Select(n => Observable.FromAsync(() => DoSomething(n)))
        .Merge(throttle)
        .Wait();
}

Job done.

查看更多
狗以群分
5楼-- · 2019-02-15 17:50

If I understand correctly, you can start tasks limited number of tasks mentioned by throttle parameter and wait for them to finish before starting next..

To wait for all started tasks to complete before starting new tasks, use the following implementation.

static async Task RunThreads(int totalThreads, int throttle)
{
    var tasks = new List<Task>();
    for (var n = 0; n < totalThreads; n++)
    {
        var task = DoSomething(n);
        tasks.Add(task);

        if (tasks.Count == throttle)
        {
            await Task.WhenAll(tasks);
            tasks.Clear();
        }
    }
    await Task.WhenAll(tasks); // wait for remaining
}

To add tasks as on when it is completed you can use the following code

static async Task RunThreads(int totalThreads, int throttle)
{
    var tasks = new List<Task>();
    for (var n = 0; n < totalThreads; n++)
    {
        var task = DoSomething(n);
        tasks.Add(task);

        if (tasks.Count == throttle)
        {
            var completed = await Task.WhenAny(tasks);
            tasks.Remove(completed);
        }
    }
    await Task.WhenAll(tasks); // all threads must complete
}
查看更多
Root(大扎)
6楼-- · 2019-02-15 17:54

The simplest option IMO is to use TPL Dataflow. You just create an ActionBLock, limit it by the desired parallelism and start posting items into it. It makes sure to only run a certain amount of tasks at the same time, and when a task completes, it starts executing the next item:

async Task RunAsync(int totalThreads, int throttle) 
{
    var block = new ActionBlock<int>(
        DoSomething,
        new ExecutionDataFlowOptions { MaxDegreeOfParallelism = throttle });

    for (var n = 0; n < totalThreads; n++)
    {
        block.Post(n);
    }

    block.Complete();
    await block.Completion;
}
查看更多
登录 后发表回答