Await multiple async Task while setting max runnin

2019-01-26 17:57发布

问题:

So I just started to try and understand async, Task, lambda and so on, and I am unable to get it to work like I want. With the code below I want for it to lock btnDoWebRequest, do a unknow number of WebRequests as a Task and once all the Task are done unlock btnDoWebRequest. However I only want a max of 3 or whatever number I set of Tasks running at one time, which I got partly from Have a set of Tasks with only X running at a time.

But after trying and modifying my code in multiple ways, it will always immediately jump back and reenabled btnDoWebRequest. Of course VS is warning me about needing awaits, currently at ".ContinueWith((task)" and at the async in "await Task.WhenAll(requestInfoList .Select(async i =>", but can't seem to work where or how to put in the needed awaits. Of course as I'm still learning there is a good chance I am going at this all wrong and the whole thing needs to be reworked. So any help would be greatly appreciated.

Thanks

    private SemaphoreSlim maxThread = new SemaphoreSlim(3);
    private void btnDoWebRequest_Click(object sender, EventArgs e)
    {
        btnDoWebRequest.Enabled = false;
        Task.Factory.StartNew(async () => await DoWebRequest()).Wait();
        btnDoWebRequest.Enabled = true;
    }

    private async Task DoWebRequest()
    {
        List<requestInfo> requestInfoList = new List<requestInfo>();
        for (int i = 0; dataRequestInfo.RowCount - 1 > i; i++)
        {
            requestInfoList.Add((requestInfo)dataRequestInfo.Rows[i].Tag);
        }
        await Task.WhenAll(requestInfoList .Select(async i => 
        {
            maxThread.Wait();
            Task.Factory.StartNew(() =>
            {
                var task = Global.webRequestWork(i);
            }, TaskCreationOptions.LongRunning).ContinueWith((task) => maxThread.Release());
        }));
    }

回答1:

First, don't use Task.Factory.StartNew by default. In fact, this should be avoided in async code. If you need to execute code on a background thread, then use Task.Run.

In your case, there's no need to use Task.Run (or Task.Factory.StartNew).

Start at the lowest level and work your way up. You already have an asynchronous web-requesting method, which I'll rename to WebRequestAsync to follow the Task-based Asynchronous Programming naming guidelines.

Next, throttle it by using the asynchronous APIs on SemaphoreSlim:

await maxThread.WaitAsync();
try
{
  await Global.WebRequestWorkAsync(i);
}
finally
{
  maxThread.Release();
}

Do that for each request info (note that no background thread is required):

private async Task DoWebRequestsAsync()
{
  List<requestInfo> requestInfoList = new List<requestInfo>();
  for (int i = 0; dataRequestInfo.RowCount - 1 > i; i++)
  {
    requestInfoList.Add((requestInfo)dataRequestInfo.Rows[i].Tag);
  }
  await Task.WhenAll(requestInfoList.Select(async i => 
  {
    await maxThread.WaitAsync();
    try
    {
      await Global.WebRequestWorkAsync(i);
    }
    finally
    {
      maxThread.Release();
    }
  }));
}

Finally, call this from your UI (again, no background thread is required):

private async void btnDoWebRequest_Click(object sender, EventArgs e)
{
  btnDoWebRequest.Enabled = false;
  await DoWebRequestsAsync();
  btnDoWebRequest.Enabled = true;
}

In summary, only use Task.Run when you need to; do not use Task.Factory.StartNew, and do not use Wait (use await instead). I have an async intro on my blog with more information.



回答2:

There are a couple of things wrong with your code:

  1. Using Wait() on a Task is like running things synchronously, hence you only notice the UI reacting when everything is done and the button reenabled. You need to await an async method in order for it to truely run async. More so, if a method is doing IO bound work like a web request, spinning up a new Thread Pool thread (using Task.Factory.StartNew) is redundant and is a waste of resources.

  2. Your button click event handler needs to be marked with async so you can await inside your method.

  3. I've cleaned up your code a bit for clarity, using the new SemaphoreSlim WaitAsync and replaced your for with a LINQ query. You may only take the first two points and apply them to your code.

    private SemaphoreSlim maxThread = new  SemaphoreSlim(3);
    
    private async void btnDoWebRequest_Click(object  sender, EventArgs e)
    {
        btnDoWebRequest.Enabled = false;
        await DoWebRequest();
        btnDoWebRequest.Enabled = true;
     }
    
     private async Task DoWebRequest()
     {
         List<requestInfo> requestInfoList = new List<requestInfo>();
    
         var requestInfoList =  dataRequestInfo.Rows.Select(x => x.Tag).Cast<requestInfo>();
    
        var tasks = requestInfoList.Select(async I => 
        {
             await maxThread.WaitAsync();
             try
             {
                 await Global.webRequestWork(i);
             }
             finally
             {
                 maxThread.Release();
             }
       });
    
       await Task.WhenAll(tasks);
    


回答3:

I have created an extension method for this.

It can be used like this:

var tt = new List<Func<Task>>()
{
    () => Thread.Sleep(300), //Thread.Sleep can be replaced by your own functionality, like calling the website
    () => Thread.Sleep(800),
    () => Thread.Sleep(250),
    () => Thread.Sleep(1000),
    () => Thread.Sleep(100),
    () => Thread.Sleep(200),
};
await tt.WhenAll(3); //this will let 3 threads run, if one ends, the next will start, untill all are finished.

The extention method:

public static class TaskExtension
{
    public static async Task WhenAll(this List<Func<Task>> actions, int threadCount)
    {
        var _countdownEvent = new CountdownEvent(actions.Count);
        var _throttler = new SemaphoreSlim(threadCount);

        foreach (Func<Task> action in actions)
        {
            await _throttler.WaitAsync();

            Task.Run(async () =>
            {
                try
                {
                    await action();
                }
                finally
                {
                    _throttler.Release();
                    _countdownEvent.Signal();
                }
            });
        }

        _countdownEvent.Wait();
    }
}


回答4:

We can easily achieve this using SemaphoreSlim. Extension method I've created:

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxActionsToRunInParallel = null)
    {
        if (maxActionsToRunInParallel.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item);

                        // action is completed, so decrement the number of currently running tasks
                        semaphoreSlim.Release();
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Sample Usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);